# Data Processing using Pyspark

In [None]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
spark

In [None]:
sc

In [None]:
# Load csv Dataset 
#df=spark.read.csv('s3://<bucket/dir>/sample_data.csv',inferSchema=True,header=True)
df=spark.read.csv('/content/gdrive/MyDrive/st0263-2266/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv',inferSchema=True,header=True)

In [None]:
#columns of dataframe
df.columns

In [None]:
#Datatypes
df.printSchema()

In [None]:
#select only 5 columns
df.select('Nombre departamento','Edad', 'Sexo','Estado', 'Fecha de diagnóstico').show(5)

In [None]:
#Change column name
df.select('Nombre Municipio', 'Edad').withColumnRenamed('Nombre Municipio', 'Mup').show(5)

In [None]:
#Add a column
df.select('ID de caso', 'Fecha de recuperación', 'Edad').withColumn("Edad luego 20 años",(df["Edad"]+20)).show(10,False)

In [None]:
#Drop columns
df.select('fecha reporte web', 'ID de caso', 'Fecha de notificación', 'Nombre departamento').drop('fecha reporte web', 'fecha de notificación').show(5)

In [None]:
#filter the records 
df.filter(df['Nombre departamento']=='BOGOTA').show(5)

In [None]:
# UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
#using lambda function
age_udf = udf(lambda age: "Adulto" if age <= 30 else "Joven", StringType())
#apply udf on dataframe
df.withColumn("Grupo de edad", age_udf(df.Edad)).show(10,False)

In [None]:
#More covid victims in DEPS
df.groupBy('Nombre departamento').count().orderBy('count',ascending=False).show(10,False)

In [None]:
#More important MUNS
df.groupBy('Nombre municipio').count().orderBy('count',ascending=False).show(10,False)

In [None]:
#More covid victims per Date

df.groupBy('Fecha de notificación').count().orderBy('count',ascending=False).show(10,False)

In [None]:
#Cases of covid per age

df.groupBy('Edad').count().orderBy('Edad',ascending=True).show()

In [None]:
#Cases of covid per country

df.groupBy('Nombre del país').count().orderBy('Nombre del país',ascending=True).show()

In [None]:
# In Python for SparkSQL
from pyspark.sql import SparkSession        
# Create a SparkSession
spark = (SparkSession
  .builder
  .appName("SparkSQLExampleApp")
  .getOrCreate())

# Path to data set
csv_file = "/content/gdrive/MyDrive/st0263-2266/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv"

# Read and create a temporary view
# Infer schema (note that for larger files you 
# may want to specify the schema)
df = (spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load(csv_file))
df.createOrReplaceTempView("covid")

In [None]:
#More covid victims in DEPS
spark.sql("""SELECT `Nombre departamento`, COUNT(*) FROM covid GROUP BY `Nombre departamento` ORDER BY 2  DESC""").show(10)

In [None]:
#More important MUNS
spark.sql("""SELECT `Nombre municipio`, COUNT(*) FROM covid GROUP BY `Nombre municipio` ORDER BY 2  DESC""").show(10)

In [None]:
#More covid victims per Date
spark.sql("""SELECT `Fecha de notificación`, COUNT(*) FROM covid GROUP BY `Fecha de notificación` ORDER BY 2  DESC""").show(10)

In [None]:
#Cases of covid per age
spark.sql("""SELECT `Edad`, COUNT(*) FROM covid GROUP BY `Edad` ORDER BY 1 ASC""").show(20)

In [None]:
#Cases of covid per country
spark.sql("""SELECT `Nombre del país`, COUNT(*) FROM covid GROUP BY `Nombre del país` ORDER BY 2  DESC""").show(10)