In [1]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
findspark.init()
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
%%html
<style>
.output_subarea.output_text.output_stream.output_stdout > pre {
  width:max-content;
}
.p-Widget.jp-RenderedText.jp-OutputArea-output > pre {
  width:max-content;
}
</style>

In [None]:
rdd=sc.parallelize([item for item in range(10)]).map(lambda x: (x, x**2))
rdd.collect()

In [None]:
df= rdd.toDF(["numero","cuadrado"])

df.printSchema()

In [None]:
df.show()

In [None]:
rdd1=sc.parallelize([(1, "Jose", 35.5),(2,"Teresa",54.3),(3,"Katia", 12.7)])

In [None]:
#Via uno

esquema1= StructType(
    [
     StructField("id", IntegerType(), True),
     StructField("nombre", StringType(), True),
     StructField("saldo", DoubleType(), True),
    ]
)

#Via dos

esquema2="`id` INT, `nombre` STRING, `saldo` DOUBLE"

df1=spark.createDataFrame(rdd1, schema=esquema1)

df1.printSchema()

In [None]:
df1.show()

In [None]:
# Crear un DataFrame mediante la lectura de un archivo de texto

df = spark.read.text('./data/dataTXT.txt')

df.show()

df.show(truncate=False)

In [None]:
# Crear un DataFrame mediante la lectura de un archivo csv

df1 = spark.read.csv('./data/dataCSV.csv')

df1.show()


In [None]:

df1 = spark.read.option('header', 'true').csv('./data/dataCSV.csv')

df1.show()

In [None]:

df2 = spark.read.option('header', 'true').option('delimiter', '|').csv('./data/dataTab.txt')

df2.show()

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

json_schema =  StructType(
    [
     StructField('color', StringType(), True),
     StructField('edad', IntegerType(), True),
     StructField('fecha', DateType(), True),
     StructField('pais', StringType(), True)
    ]
)

df4 = spark.read.schema(json_schema).json('./data/dataJSON.json')

df4.show()

In [None]:
df4.printSchema()

In [None]:
df5 = spark.read.parquet('./data/dataPARQUET.parquet')

df5.show()

In [None]:

# Otra alternativa para leer desde una fuente de datos parquet en este caso

df6 = spark.read.format('parquet').load('./data/dataPARQUET.parquet')

df6.printSchema()

In [None]:
df = spark.read.parquet('./data/dataPARQUET.parquet')

df.printSchema()


In [None]:
# Primera alternativa para referirnos a las columnas

df.select('title').show()

In [None]:

df.select(col('title'), col('likes')).show()


In [None]:
# select

df = spark.read.parquet('./data/datos.parquet')

df.printSchema()

In [None]:
from pyspark.sql.functions import col

df.select(col('video_id')).show()


In [None]:
df.select('video_id', 'trending_date').show()

In [None]:
# Esta vía nos dará error

df.select(
    'likes',
    'dislikes',
    ('likes' - 'dislikes')
).show()

In [None]:
df.select(
    col('likes'),
    col('dislikes'),
    (col('likes') - col('dislikes')).alias('aceptacion')
).show()


In [None]:
# selectExpr
df.selectExpr('likes', 'dislikes', '(likes - dislikes) as aceptacion').show()

In [None]:
df.selectExpr("count(distinct(video_id)) as videos").show()

In [None]:
df_prueba = spark.read.parquet('./data/datos.parquet')
df_prueba2=df_prueba.select('title').filter(col('title').like('%WE WANT TO TALK%'))
df_prueba2.show()

In [None]:
#Filter
df = spark.read.parquet('./data/datos.parquet')
df.show( )



In [None]:
df.filter(col('video_id') == "2kyS6SvSYSE").show()

In [None]:
#Where
df1 = spark.read.parquet('./data/datos.parquet').where(col("trending_date") != '17.14.11')

In [None]:
df1.show()

In [None]:
df2=spark.read.parquet('./data/datos.parquet').where(col("likes")>5000)

In [None]:
df2.filter(col("trending_date") != '17.14.11').filter(col("likes")<7000)

In [None]:
#distinc
df = spark.read.parquet('./data/datos.parquet')
df_duplicados=df.distinct()

In [None]:
print("El conteo del dataframe original es {}".format(df.count()))
print("El conteo del dataframe original es {}".format(df_duplicados.count()))


In [None]:
#dropDuplicate (se le puede indicar cual columnas debe tomar para eliminar los duplicados)

dataframe = spark.createDataFrame([(1, 'azul', 567), (2, 'rojo', 487), (1, 'azul', 345), (2, 'verde', 783)]).toDF('id', 'color', 'importe')

dataframe.show()

In [None]:
dataframe.dropDuplicates(['id', 'color']).show()

In [None]:
df = (spark.read.parquet('./data/datos.parquet')
    .select(col('likes'), col('views'), col('video_id'), col('dislikes'))
    .dropDuplicates(['video_id'])
)

df.show()

In [None]:
#sort
df.sort('likes').show()

In [None]:
from pyspark.sql.functions import desc

df.sort(desc('likes')).show()

In [None]:
# función orderBy

df.orderBy(col('views')).show()df.orderBy(col('views').desc()).show()

In [None]:
df.orderBy(col('views').desc()).show()

In [15]:
dataframe = spark.createDataFrame([(1, 'azul', 568), (2, 'rojo', 235), (1, 'azul', 456), (2, 'azul', 783)]).toDF('id', 'color', 'importe')

dataframe.show()

+---+-----+-------+
| id|color|importe|
+---+-----+-------+
|  1| azul|    568|
|  2| rojo|    235|
|  1| azul|    456|
|  2| azul|    783|
+---+-----+-------+



In [20]:
#primero ordena por colores en orden alfabetico, y los que son azules (iguales) los ordena por el importe
dataframe.orderBy(col('color').desc(), col('importe')).show()

+---+-----+-------+
| id|color|importe|
+---+-----+-------+
|  2| rojo|    235|
|  1| azul|    456|
|  1| azul|    568|
|  2| azul|    783|
+---+-----+-------+



In [30]:
# funcion limit 

top_10 = df.orderBy(col('views').desc()).limit(10)

top_10.show()

+-------+--------+-----------+--------+
|  likes|   views|   video_id|dislikes|
+-------+--------+-----------+--------+
| 609101|48431654|-BQJo3vK8O8|   52259|
|3880071|39349927|7C2z4GqqS5E|   72707|
|1111592|38873543|i0p1bmr0EmE|   96407|
|1735895|37736281|6ZfuNTqbHE8|   21969|
|1634124|33523622|2Vv-BfVoq4g|   21082|
|1405355|31648454|VYOjWnS4cMY|   51547|
| 850362|27973210|u9Mv98Gr5pY|   26541|
|1149185|24782158|FlsCjmMhFmw|  483924|
| 641546|24421448|U9BwWKXjVaI|   16517|
| 587326|23758250|1J76wN0TPI4|   18799|
+-------+--------+-----------+--------+



In [5]:
# withColumn
df = spark.read.parquet('./data/datos.parquet')


In [6]:
df_valoracion = df.withColumn('valoracion', col('likes') - col('dislikes'))

df_valoracion.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)
 |-- valoracion: integer (nullable = true)



In [7]:
df_valoracion1 = (df.withColumn('valoracion', col('likes') - col('dislikes'))
                    .withColumn('res_div', col('valoracion') % 10)
)

df_valoracion1.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)
 |-- valoracion: integer (nullable = true)
 |-- res_div: integer (nullable = true)



In [8]:
df_valoracion1.select(col('likes'), col('dislikes'), col('valoracion'), col('res_div')).show()


+------+--------+----------+-------+
| likes|dislikes|valoracion|res_div|
+------+--------+----------+-------+
| 57527|    2966|     54561|      1|
| 97185|    6146|     91039|      9|
|146033|    5339|    140694|      4|
| 10172|     666|      9506|      6|
|132235|    1989|    130246|      6|
|  9763|     511|      9252|      2|
| 15993|    2445|     13548|      8|
| 23663|     778|     22885|      5|
|  3543|     119|      3424|      4|
| 12654|    1363|     11291|      1|
|   655|      25|       630|      0|
|  1576|     303|      1273|      3|
|114188|    1333|    112855|      5|
|  7848|    1171|      6677|      7|
|  7473|     246|      7227|      7|
|  9419|      52|      9367|      7|
|  8011|     638|      7373|      3|
|  5398|      53|      5345|      5|
| 11963|      36|     11927|      7|
|  8421|     191|      8230|      0|
+------+--------+----------+-------+
only showing top 20 rows



In [9]:
# withColumnRenamed

df_renombrado = df.withColumnRenamed('video_id', 'id')

df_renombrado.printSchema()

root
 |-- id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



In [10]:

df_error = df.withColumnRenamed('nombre_que_no_existe', 'otro_nombre')

df_error.printSchema()# drop

df.printSchema()

df_util = df.drop('comments_disabled')

df_util.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



In [11]:
# drop

df.printSchema()

df_util = df.drop('comments_disabled')

df_util.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)


In [12]:
df_util = df.drop('comments_disabled', 'ratings_disabled', 'thumbnail_link')

df_util.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



In [13]:

df_util = df.drop('comments_disabled', 'ratings_disabled', 'thumbnail_link', 'cafe')

df_util.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



In [14]:
# sample

df_muestra = df.sample(0.8)

num_filas = df.count()
num_filas_muestra = df_muestra.count()

print('El 80% de filas del dataframe original es {}'.format(num_filas - (num_filas*0.2)))
print('El numero de filas del dataframe muestra es {}'.format(num_filas_muestra))


El 80% de filas del dataframe original es 38509.6
El numero de filas del dataframe muestra es 38579


In [15]:
df_muestra = df.sample(fraction=0.8, seed=1234)

df_muestra = df.sample(withReplacement=True, fraction=0.8, seed=1234)

In [16]:
# randomSplit

train, test = df.randomSplit([0.8, 0.2], seed=1234)

train, validation, test = df.randomSplit([0.6, 0.2, 0.2], seed=1234)

train.count()

validation.count()

test.count()

9595