In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
   .appName("Neural Network Model") \
   .config("spark.executor.memory", "3gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [None]:
sc

**2. Data Understanding using SparkSQL**

In [None]:
! wget https://storage.googleapis.com/class25jan2022/share/2008.csv

--2022-03-02 09:52:16--  https://storage.googleapis.com/class25jan2022/share/2008.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.218.128, 142.251.18.128, 142.250.153.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.218.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 689413344 (657M) [text/csv]
Saving to: ‘2008.csv.1’


2022-03-02 09:52:19 (212 MB/s) - ‘2008.csv.1’ saved [689413344/689413344]



In [None]:
! wc -l ./2008.csv

7009729 ./2008.csv


In [None]:
! head -3 2008.csv

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2008,1,3,4,2003,1955,2211,2225,WN,335,N712SW,128,150,116,-14,8,IAD,TPA,810,4,8,0,,0,NA,NA,NA,NA,NA
2008,1,3,4,754,735,1002,1000,WN,3231,N772SW,128,145,113,2,19,IAD,TPA,810,5,10,0,,0,NA,NA,NA,NA,NA


In [None]:
raw_df = spark.read.format('csv').\
option('header','true').option('mode','DROPMALFORMED')\
.load('2008.csv')

In [None]:
colors = ['white'  ,'green' ,'yellow','red' ,'brown' ,'pink' ]

In [None]:
color_rdd = sc.parallelize(colors)

In [None]:
keyval_rdd = color_rdd.map(lambda x : (x , len(x)))

In [None]:
color_df = keyval_rdd.toDF(['color' , 'length'])

In [None]:
color_df.show()

+------+------+
| color|length|
+------+------+
| white|     5|
| green|     5|
|yellow|     6|
|   red|     3|
| brown|     5|
|  pink|     4|
+------+------+



In [None]:
color_df.printSchema()

root
 |-- color: string (nullable = true)
 |-- length: long (nullable = true)



In [None]:
color_df.show(3)

+------+------+
| color|length|
+------+------+
| white|     5|
| green|     5|
|yellow|     6|
+------+------+
only showing top 3 rows



In [None]:
color_df.columns

['color', 'length']

In [None]:
color_df.filter(color_df.length.between(4,5))\
.select(color_df.color.alias('mid_length')).show()

+----------+
|mid_length|
+----------+
|     white|
|     green|
|     brown|
|      pink|
+----------+



In [None]:
color_df.filter(color_df.length> 4).filter(color_df[0]!='white').show()

+------+------+
| color|length|
+------+------+
| green|     5|
|yellow|     6|
| brown|     5|
+------+------+



In [None]:
color_df.filter((color_df.length> 4) & (color_df[0] != 'white')).show()

+------+------+
| color|length|
+------+------+
| green|     5|
|yellow|     6|
| brown|     5|
+------+------+



In [None]:
color_df.filter(color_df['length'] >= 4).sort("length" , 'color' , ascending = False).show()

+------+------+
| color|length|
+------+------+
|yellow|     6|
| white|     5|
| green|     5|
| brown|     5|
|  pink|     4|
+------+------+



In [None]:
color_df.orderBy('length', 'color').show()

+------+------+
| color|length|
+------+------+
|   red|     3|
|  pink|     4|
| brown|     5|
| green|     5|
| white|     5|
|yellow|     6|
+------+------+



In [None]:
color_df.sort(color_df.length.desc()).show()

+------+------+
| color|length|
+------+------+
|yellow|     6|
| brown|     5|
| white|     5|
| green|     5|
|  pink|     4|
|   red|     3|
+------+------+



In [None]:
color_df.groupby('length').count().show()

+------+-----+
|length|count|
+------+-----+
|     6|    1|
|     5|    3|
|     3|    1|
|     4|    1|
+------+-----+



In [None]:
color_df.groupby('length').count().show()

+------+-----+
|length|count|
+------+-----+
|     6|    1|
|     5|    3|
|     3|    1|
|     4|    1|
+------+-----+



In [None]:
color_df.describe().show()

+-------+------+------------------+
|summary| color|            length|
+-------+------+------------------+
|  count|     6|                 6|
|   mean|  null| 4.666666666666667|
| stddev|  null|1.0327955589886444|
|    min| brown|                 3|
|    max|yellow|                 6|
+-------+------+------------------+



In [None]:
color_df.orderBy('length','color').take(4)

[Row(color='red', length=3),
 Row(color='pink', length=4),
 Row(color='brown', length=5),
 Row(color='green', length=5)]

In [None]:
color_df.sort(color_df.length.desc()).show()

+------+------+
| color|length|
+------+------+
|yellow|     6|
| white|     5|
| brown|     5|
| green|     5|
|  pink|     4|
|   red|     3|
+------+------+



In [None]:
color_df.groupby('length').count().show()

+------+-----+
|length|count|
+------+-----+
|     6|    1|
|     5|    3|
|     3|    1|
|     4|    1|
+------+-----+



In [None]:
df = sc.parallelize([
                     (10,'',10000) , (20,'Female',30000) , (None,'Male',80000) , (None,'Male',5000)
]).toDF(['age','gender','income'])

df.show()

+----+------+------+
| age|gender|income|
+----+------+------+
|  10|      | 10000|
|  20|Female| 30000|
|null|  Male| 80000|
|null|  Male|  5000|
+----+------+------+



In [None]:
df.describe().show()

+-------+------------------+------+-----------------+
|summary|               age|gender|           income|
+-------+------------------+------+-----------------+
|  count|                 2|     4|                4|
|   mean|              15.0|  null|          31250.0|
| stddev|7.0710678118654755|  null|34247.87098005753|
|    min|                10|      |             5000|
|    max|                20|  Male|            80000|
+-------+------------------+------+-----------------+



# Data Cleansing : Null

In [None]:
avg_age = df.na.drop().agg({'age':'avg'})

In [None]:
avg_age.show()

+--------+
|avg(age)|
+--------+
|    15.0|
+--------+



In [None]:
avg_age = df.na.drop().select('age').agg({'age':'avg'})

In [None]:
avg_age.show()

+--------+
|avg(age)|
+--------+
|    15.0|
+--------+



In [None]:
avg_age = avg_age.collect()[0][0]

In [None]:
avg_age

15.0

In [None]:
from pyspark.sql.functions import *
sparkf_replaceNull = udf(lambda x : avg_age if x == None else x)

In [None]:
no_null_df = df.withColumn('age' , sparkf_replaceNull(col('age')))

In [None]:
no_null_df.show()

+----+------+------+
| age|gender|income|
+----+------+------+
|  10|      | 10000|
|  20|Female| 30000|
|15.0|  Male| 80000|
|15.0|  Male|  5000|
+----+------+------+



# Data Cleansing : Empty Value

In [None]:
 treat_missing = udf(lambda x : "Male_Assume" if x == "" else x)

In [None]:
no_missing_df = no_null_df.withColumn('new_gender',\
                                      treat_missing(no_null_df.gender))

In [None]:
no_missing_df.show()

+----+------+------+-----------+
| age|gender|income| new_gender|
+----+------+------+-----------+
|  10|      | 10000|Male_Assume|
|  20|Female| 30000|     Female|
|15.0|  Male| 80000|       Male|
|15.0|  Male|  5000|       Male|
+----+------+------+-----------+



In [None]:
no_outlier_df = no_missing_df.filter(no_missing_df.income >= 10000)

In [None]:
no_outlier_df.show()

+----+------+------+-----------+
| age|gender|income| new_gender|
+----+------+------+-----------+
|  10|      | 10000|Male_Assume|
|  20|Female| 30000|     Female|
|15.0|  Male| 80000|       Male|
+----+------+------+-----------+



In [None]:
df.show()

+----+------+------+
| age|gender|income|
+----+------+------+
|  10|      | 10000|
|  20|Female| 30000|
|null|  Male| 80000|
|null|  Male|  5000|
+----+------+------+

