In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark=SparkSession.builder.appName("DataWrangling").getOrCreate()

In [None]:
arch = "D:\\Universidad\\Semestre_8\\\Big Data\\movie_data_tmdb.csv"

In [None]:
df = spark.read.format("csv")\
.option("inferSchema", False)\ # no infiere los tipos de datos
.option("header", True)\
.option("sep", "|")
.load(arch)



In [None]:
df.printSchema()
# Visualiza las columnas, sus tipos de datos y si son nullable

# Salida:
# root
# |-- adult: string (nullable = true)
# |-- backdrop_path: string (nullable = true)
# |-- belongs_to_collection: string (nullable = true)
# ........

In [None]:
df.dtypes
# desglosa la columna por nombre y tipos

# Salida:
# [('adult', 'string'),
#  ('backdrop_path', 'string'),
#  ('belongs_to_collection', 'string'),
#  ('budget', 'string'),
#  ......]

In [None]:
df.columns
# nombre de las columnas

# Salida:
# ['adult',
#  'backdrop_path',
#  'belongs_to_collection',
#  'budget',
#  'genres',
#  ....]

In [None]:
df.count()

# Salida:
# 120127

In [None]:
cols = ['id', 'budget', 'popularity', 'release_date', 'revenue', 'title']

In [None]:
subconjunto = df.select(*cols)

In [None]:
subconjunto.show()
# Muestra una tabla con especificas columnas

In [None]:
df.show()
# Muestra los datos pero sin ordern

In [None]:
subconjunto.show(10, False) #False no sobreescibe
# Muestra informacion en una tabla

In [None]:
# Para calcular los valores faltantes en una columan o en
# una columan o en varias
# utilizando funciones de Spark
from pyspark.sql.function import *

In [None]:
df.filtrer((df['popularity']=='')| df['popularity']\
           .isNull()|isnan(df['popularity'])).count()

# Salida
# 1059

In [None]:
# Calcular todos los valores faltantes en el DF
# alias genera un nuevo dataset con un nuevo "nombre": c =
#

df.select([count(when((col(c)=='') | 
                      col(c).isNull() | isnan(c),c)
                 .alias(c) for c in df.columns]).show()
           


In [None]:
# Calcular las frecuencias por categoria(columna)
df.groupBy(df['title']).count().show()

In [None]:
# Número de veces que aparece en el dataset
# Ordenados
df.groupBy(df['title']).count().sort(desct('count')).show(10, False)

In [None]:
# Creando un subconjunto temporal del DF
# Elimina cualquier valor faltante
temp = df.filtrer((df['title']!='') & (df['title'].isNotNull()) & ('tilde'isnan(df['title'])))

In [None]:
temp.groupby(temp['title']).count().filtrer(" 'count' > 4 ").sort(col("count").desc()).show(10,False)

In [None]:
temp.groupby(temp['title']).count().filtrer(" 'count' >= 4 ").sort(col("count").desc()).show()

# Salida:
# <bound method DataFrame.show of DataFrame[title: string, count: bigint]>

In [None]:
# Borrar los DF
del temp

In [None]:
df.dtypes

# Salida:
# [('adult', 'string'),
#  ('backdrop_path', 'string'),
#  ('belongs_to_collection', 'string'),
#  ('budget', 'string'),
#  ......]

In [None]:
# Convertir budget a flotante
df = df.withColumn("budget", df['budget'].cast("float"))

In [None]:
from pyspark.sql.types import *

In [None]:
int_vars=["id"]
float_vars=["budget", "popularity", "revenue"]
date_vars=["release_date"]

In [None]:
for c in int_vars:
    df = df.withColumn(c, df[c].cast(IntegerType()))
    
for c in float_vars:
    df = df.withColumn(c, df[c].cast(FloatType()))
    
for c in date_vars:
    df = df.withColumn(c, df[c].cast(DateType()))

In [None]:
df.dtypes

# Salida:
# [('budget', 'float'),
#  ('revenue', 'float'),
#  ('id', 'Integer'),
#  ('popularity', 'float'),
#  ('release_date', 'Date')
# Conversion de los tipos de los datos, asi no sale solo la salida

In [None]:
df.show(10, False)

In [None]:
#Estadisticas en el dataset
df.describe()

In [None]:
#Calcular la mediana pero primero hay que eliminar
# los valores que sean nulos
# isnan functions de pyspark
df_temp = df.filtrer((df["budget"]!=0) & (df["budget"].isNotNull()) & ('tilde'isnan(df["budget"])))

In [None]:
#approxQuantile ( ColumnaNumerica, probabilidad (0, minimo)(0.5, mediana), (1,max)), error_relativo)
mediana = df_temp.approxQuantile("budget", [0.5], [0.1])

In [None]:
print("Mediana de la columna budget: ", str(mediana))

# Salida
# Mediana de la columna budget:  [3700000.0]

In [None]:
# agg = groupBy().agg() crea un subconjunto de datos
# Valores unicos
df.agg(countDistinct(col("title")).alias("count")).show()

# Salida:
# count:107973
# Sale en una tablita

In [None]:
# distinct elimina valores duplicados sobre una columna
df.select("title").distinct().show(10, False)

# Sale los titulos en una tablita

In [None]:
# Extraer el año de release_date: year, month, dayofmonth
df_temp = df.withColumn("release_year", year("release_date"))

In [None]:
df_temp.groupBy("release_year").agg(countDistinct("title")).show(10, False)

# Sale esas columnas en una tablita

In [None]:
# Filtrando aquellos peliculas que inician con "Meet"
df.filtrer(df["title"].like("Meet%")).show(10,False)

In [None]:
# Filtrar las peliculas que su nombre no terminen en "s"
df.filtrer("tilde"df["title"].like("%s")).show(10,False)

In [None]:
# Creando nuevas columnas
# 1. Calcular la media
# collect trae todos los elementos de los nodos de datos, los junta y se los paso al drive node
meanPop = df.agg({"popularity:mean"}).collect()[0]["avg(popularity)"]

In [None]:
cuentaPop = df.count()

In [None]:
# lit() agrega una nueva columna y le coloca un valor literal o constante
df = df.withColumn("mean_popularity", lit(meanPop))

In [None]:
df = df.withColumn("variance", pow((df["popularity"]-df["mean_popularity"]),2))

In [None]:
variance_sum = df.agg({"variance:sum"}).collect()[0]["sum(variance)"]

In [None]:
variance_popu = variance_sum/(cuentaPop-1)

In [None]:
print(variance_popu)

# Salida
# 238.7163979...

In [None]:
# Mismo trabajo que en el anterior pero utilizando multiples variables

def new_cols(budget, popularity):
    if budget < 1000000: budget_cat="Small"
    elif budget < 1000000: budget_cat="Medium"
    else: budget_cat = "Big"
    if popularity<3: ratings="Low"
    elif popularity<5: ratings="Mid"
    else ratings="High"
    return budget_cat, ratings

In [None]:
# Utilizamos la funcion sobre el DF
# udf() es una user-defined function
udfB = udf(new_cols, StructType([StructField("budget_cat", StringType(), 
                                             True), StructField("ratings", StringType(), True]))