In [1]:
# Importing the Required Packages
from pyspark import SparkContext,SQLContext
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import lower,col,upper

In [2]:
# Reading the FIles using spark.read utilities
# File uploaded to /FileStore/tables/PersonalData.txt
# File uploaded to /FileStore/tables/TravelData.txt

In [3]:
ps_data_df=spark.read.csv('/FileStore/tables/PersonalData.txt',header=True,sep='\t',inferSchema=True)
tr_data_df=spark.read.format('csv').option('header',True).option('sep','\t').option('inferSchema',True).load('/FileStore/tables/TravelData.txt')

In [4]:
ps_data_df.show()

In [5]:
# Basic Analysis
# To get names of Columns 
ps_data_df.columns
tr_data_df.columns

# to get the schema 
ps_data_df.printSchema()
tr_data_df.printSchema()

# to get the number of rows in the dtaframe
ps_data_df.count() 
# it contains 891 observation
tr_data_df.count()  
# it contains 891 observation

In [6]:
# Select Operations
print(ps_data_df.select(['PassengerId','Name']).show())
print(ps_data_df.selectExpr('Name as a ','  Survived  as b').show())

print(tr_data_df.select(['PassengerId','Pclass']).show())
print(tr_data_df.selectExpr('PassengerId as a ','  Pclass  as b').show())

In [7]:
# Filter Female Records
ps_data_df.filter(upper(col('Sex'))=='FEMALE').count()
#ps_data_df.filter(ps_data_df['Sex']=='female').count()
#ps_data_df.filter(ps_data_df.Sex=='female').count()
#ps_data_df.filter("Sex=='female'").count()

In [8]:
# Create a new column named AgeTimeFare for tr_data_df
tr_data_df=tr_data_df.withColumn('AgeTimeFare',tr_data_df['Age']*tr_data_df['Fare'])

In [9]:
# Group by Pclass
grp_tr_dat_df=tr_data_df.groupBy('Pclass')
#grp_tr_dat_df.avg('Fare').show()
grp_tr_dat_df.agg({'PassengerId':'count','Age':'avg','Fare':'sum'}).show()

In [10]:
# sORT THE dATAFrame
tr_data_df=tr_data_df.sort('Fare',ascending=True)

In [11]:
# JOin ps_data_df and tr_data_df 
# equi Join
maindf=ps_data_df.join(tr_data_df,on=['PassengerId'])
# Non Equi Join
non_equi_df=ps_data_df.join(tr_data_df,on=ps_data_df.PassengerId <= tr_data_df.PassengerId)

maindf.count()
non_equi_df.count()

In [12]:
# Union
ps_data_df.union(ps_data_df).sort('PassengerId',ascending=True).show()

In [13]:
# Physical Plan
ps_data_df.explain()
tr_data_df.explain()

In [14]:
# checking the storage level of dattaframe before doing the caching
ps_data_df.storageLevel

In [15]:
# Cacheing the data
ps_data_df.cache()
ps_data_df.storageLevel

In [16]:
# Unpersisting the dataframe
ps_data_df.unpersist()
ps_data_df.storageLevel

In [17]:
# to get number of partitions
ps_data_df.rdd.getStorageLevel()
ps_data_df.rdd.getNumPartitions()

In [18]:
# Writing the file
ps_data_df.rdd.saveAsTextFile('/FileStore/tables/Modified.txt')

In [19]:
#Writing the File
ps_data_df.write.format('csv').option('header',True).option('mode','overwrite').option('sep','|').save('/FileStore/tables/Overall.txt')