# Data Processing using Pyspark

In [None]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
spark

In [None]:
sc

In [None]:
# Load csv Dataset
#df=spark.read.csv('s3://<bucket/dir>/sample_data.csv',inferSchema=True,header=True)
df=spark.read.csv("gdrive/MyDrive/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-1K.csv",inferSchema=True,header=True)

In [None]:
#columns of dataframe
df.columns

In [None]:
#check number of columns
len(df.columns)

In [None]:
#number of records in dataframe
df.count()

In [None]:
#shape of dataset
print((df.count(),len(df.columns)))

In [None]:
#printSchema
df.printSchema()

In [None]:
#fisrt few rows of dataframe
df.show(5)

In [None]:

#select only 2 columns
df.select('Nombre departamento','Nombre municipio').show(5)

In [None]:
#info about dataframe
df.describe().show()

In [None]:
from pyspark.sql.types import StringType,DoubleType,IntegerType

In [None]:
#with column
df.withColumn("Edad_after_10_yrs",(df["Edad"]+10)).show(10,False)

In [None]:
df.withColumn('Edad_double',df['Edad'].cast(DoubleType())).show(10,False)

In [None]:
#with column
df.withColumn("Edad_after_10_yrs",(df["Edad"]+10)).show(10,False)

In [None]:
#filter the records
df.filter(df['Tipo de contagio']=='Importado').show()

In [None]:
#filter the records
df.filter(df['Tipo de contagio']=='Importado').select('Edad','Sexo','Tipo de contagio').show()

In [None]:
#filter the multiple conditions
df.filter(df['Tipo de contagio']=='Importado').filter(df['ID de caso'] >10).show()

In [None]:
#filter the multiple conditions
df.filter((df['Tipo de contagio']=='Importado')&(df['ID de caso'] >10)).show()

In [None]:
#Distinct Values in a column
df.select('Tipo de contagio').distinct().show()

In [None]:
#distinct value count
df.select('Tipo de contagio').distinct().count()

In [None]:
df.groupBy('Tipo de contagio').count().show(5,False)

In [None]:
# Value counts
df.groupBy('Tipo de contagio').count().orderBy('count',ascending=False).show(5,False)

In [None]:
# Value counts
df.groupBy('Tipo de contagio').mean().show(5,False)

In [None]:
df.groupBy('Tipo de contagio').sum().show(5,False)

In [None]:
# Value counts
df.groupBy('Tipo de contagio').max().show(5,False)

In [None]:
# Value counts
df.groupBy('Tipo de contagio').min().show(5,False)

In [None]:
#Aggregation
df.groupBy('Tipo de contagio').agg({'ID de caso':'sum'}).show(5,False)

In [None]:
# UDF
from pyspark.sql.functions import udf


In [None]:
#normal function
def price_range(brand):
    if brand in ['Comunitaria', 'Relacionado']:
        return 'Comunitaria o Relacionado'
    elif brand =='Importado':
        return 'Importado'
    else:
        return 'En estudio'

In [None]:
#create udf using python function
brand_udf=udf(price_range,StringType())
#apply udf on dataframe
df.withColumn('price_range',brand_udf(df['Tipo de contagio'])).show(10,False)

In [None]:
#using lambda function
age_udf = udf(lambda Edad: "young" if Edad <= 30 else "senior", StringType())
#apply udf on dataframe
df.withColumn("age_group", age_udf(df.Edad)).show(10,False)

In [None]:
#pandas udf
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [None]:
#create python function
def remaining_yrs(age):
    yrs_left=100-age

    return yrs_left

In [None]:
#create udf using python function
length_udf = udf(remaining_yrs, IntegerType())
#apply pandas udf on dataframe
df.withColumn("yrs_left", length_udf(df['Edad'])).show(10,False)

In [None]:
#udf using two columns
def prod(rating,exp):
    x=rating*exp
    return x

In [None]:
#create udf using python function
prod_udf = udf(prod, DoubleType())
#apply pandas udf on multiple columns of dataframe
df.withColumn("product", prod_udf(df['Edad'],df['ID de caso'])).show(10,False)

In [None]:
#duplicate values
df.count()

In [None]:
#drop duplicate values
df=df.dropDuplicates()

In [None]:
#validate new count
df.count()

In [None]:
#drop column of dataframe
df_new=df.drop('Tipo de contagio')

In [None]:
df_new.show(10)

In [None]:
# saving file (csv)

In [None]:
#target directory
write_uri='s3://jupyterjorge/trusted/datasets/covid19/results'

In [None]:
#save the dataframe as single csv
df.coalesce(1).write.format("csv").option("header","true").save(write_uri)

In [None]:
# parquet

In [None]:
#target location
parquet_uri='s3://jupyterjorge/trusted/datasets/covid19/results_parquet'

In [None]:
#save the data into parquet format
df.write.format('parquet').save(parquet_uri)