# Data Processing using Pyspark

In [None]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
#instalar java y spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [None]:
# verificar que tenga previamente el paquete 'pyspark' instalado
!pip install pyspark
# en el cluster EMR no hay necesidad de instalar este paquete, ya viene con AWS EMR / Notebooks

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
# en el cluster EMR no hay necesidad de hacer esto, ya viene con AWS EMR / Notebooks
spark = SparkSession.builder.master("local[*]").getOrCreate()
#create spark session object
#spark=SparkSession.builder.appName('data_processing').getOrCreate()
# en el cluster EMR no hay necesidad de hacer esto, ya viene con AWS EMR / Notebooks
sc = spark.sparkContext

In [None]:
# acceder datos en S3 desde spark local
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", '')
hadoop_conf.set("fs.s3n.awsSecretAccessKey", '')
hadoop_conf.set("fs.s3n.awsSessionToken", '')

In [None]:
# Load csv Dataset 
# desde gdrvie
df=spark.read.csv('gdrive/MyDrive/st1612_20221/datasets/sample_data.csv',inferSchema=True,header=True)

# desde local
#df=spark.read.csv('../datasets/sample_data.csv',inferSchema=True,header=True)
# desde S3
#df=spark.read.csv('s3n://st1612datasets/otros/sample_data.csv',inferSchema=True,header=True)


In [None]:
#columns of dataframe
df.columns

In [None]:
#check number of columns
len(df.columns)

In [None]:
#number of records in dataframe
df.count()

In [None]:
#shape of dataset
print((df.count(),len(df.columns)))

In [None]:
#printSchema
df.printSchema()

In [None]:
#fisrt few rows of dataframe
df.show(5)

In [None]:
#select only 2 columns
df.select('age','mobile').show(5)

In [None]:
#info about dataframe
df.describe().show()

In [None]:
from pyspark.sql.types import StringType,DoubleType,IntegerType

In [None]:
#add column
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)

In [None]:
#add column
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)

In [None]:
#add column
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)

In [None]:
#filter the records 
df.filter(df['mobile']=='Vivo').show()

In [None]:
#filter the records 
df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()

In [None]:
#filter the multiple conditions
df.filter(df['mobile']=='Vivo').filter(df['experience'] >10).show()

In [None]:
#filter the multiple conditions
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()

In [None]:
#Distinct Values in a column
df.select('mobile').distinct().show()

In [None]:
#distinct value count
df.select('mobile').distinct().count()

In [None]:
df.groupBy('mobile').count().show(5,False)

In [None]:
# Value counts
df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)

In [None]:
# Value counts
df.groupBy('mobile').mean().show(5,False)

In [None]:
df.groupBy('mobile').sum().show(5,False)

In [None]:
# Value counts
df.groupBy('mobile').max().show(5,False)

In [None]:
# Value counts
df.groupBy('mobile').min().show(5,False)

In [None]:
#Aggregation
df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)

In [None]:
# UDF
from pyspark.sql.functions import udf


In [None]:
#normal function 
def price_range(brand):
    if brand in ['Samsung','Apple']:
        return 'High Price'
    elif brand =='MI':
        return 'Mid Price'
    else:
        return 'Low Price'

In [None]:
#create udf using python function
brand_udf=udf(price_range,StringType())
#apply udf on dataframe
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)

In [None]:
#using lambda function
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
#apply udf on dataframe
df.withColumn("age_group", age_udf(df.age)).show(10,False)

In [None]:
#pandas udf
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [None]:
#create python function
def remaining_yrs(age):
    yrs_left=100-age

    return yrs_left

In [None]:
# library requires by pandas_udf()
!pip install pyarrow
!conda install pyarrow

In [None]:
#create udf using python function
length_udf = pandas_udf(remaining_yrs, IntegerType())
#apply pandas udf on dataframe
df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)

In [None]:
#udf using two columns 
def prod(rating,exp):
    x=rating*exp
    return x

In [None]:
#create udf using python function
prod_udf = pandas_udf(prod, DoubleType())
#apply pandas udf on multiple columns of dataframe
df.withColumn("product", prod_udf(df['ratings'],df['experience'])).show(10,False)

In [None]:
#duplicate values
df.count()

In [None]:
#drop duplicate values
df=df.dropDuplicates()

In [None]:
#validate new count
df.count()

In [None]:
#drop column of dataframe
df_new=df.drop('mobile')

In [None]:
df_new.show(10)

In [None]:
# saving file (csv)

In [None]:
#current working directory
!pwd

In [None]:
#target directory 
#pathcsv_out='../out/df_csv'
pathcsv_out='gdrive/MyDrive/st1612_20221/out/df_csv'
# hacia S3
# write_uri='s3://bucket_name/df_csv'

In [None]:
#save the dataframe as single csv 
df.coalesce(1).write.format("csv").option("header","true").save(pathcsv_out)


In [None]:
# parquet

In [None]:
#target location
#pathparquet_out='../out/df_parquet'
pathparquet_out='gdrive/MyDrive/st1612_20221/out/df_parquet'

# hacia S3
# write_uri='s3://bucket_name/df_parquet'

In [None]:
#save the data into parquet format 
df.write.format('parquet').save(pathparquet_out)