In [0]:
from pyspark import SparkContext
#sc=SparkContext() 
#already sparkcontext is running
sc

In [0]:
input=sc.textFile("/FileStore/tables/yellow_tripdata.csv")
#input is an rdd
input.take(20)
input.getNumPartitions()# returns no of partitions of the rdd.
input1=input.repartition(4)
input1.getNumPartitions()

In [0]:
header=input1.first()
input=input1.filter(lambda line:line!=header)
input.take(20)
#input.saveAsTextFile("/FileStore/tables/yellow_tripdata.txt")
#input.saveAsTextFile("/FileStore/tables/yellow_tripdata_format.csv")

In [0]:
# importing types of pyspark.sql
from pyspark.sql.types import *
# creating an external data schema
data_schema=StructType([StructField('vendor_id',StringType(),True),StructField('pickup_datetime',StringType(),True),StructField('dropoff_datetime',StringType(),True),
                       StructField('passenger_count',IntegerType(),True),StructField('trip_distance',FloatType(),True),StructField('pickup_longitude',FloatType(),True),
                       StructField('pickup_lattitude',FloatType(),True),StructField('rate_code',IntegerType(),True),StructField('store_and_fwd_flag',StringType(),True),
                       StructField('dropoff_longitude',FloatType(),True),StructField('dropoff_lattitude',FloatType(),True),StructField('payment_type',IntegerType(),True),
                       StructField('fare_amount',FloatType(),True),StructField('extra',FloatType(),True),StructField('mta_tax',FloatType(),True),
                       StructField('tip_amount',FloatType(),True),StructField('tolls_amount',FloatType(),True),StructField('total_amount',FloatType(),True),
                       StructField('trip_time_in_secs',StringType(),True)
                       ])

In [0]:
type(data_schema)
data_schema

In [0]:
#from pyspark.sql import SparkSession
#spark=SparkSession.builder.appName("Yellow Tripata Analysis").enableHiveSupport().getOrCreate()
spark


In [0]:
#Reading from filestore to dataframe
df=spark.read.csv("/FileStore/tables/yellow_tripdata_format.csv")
#Schema will be automatically infered.
df.show()

In [0]:
# Reading data and applying external defined schema
df_schema=spark.read.csv("/FileStore/tables/yellow_tripdata_format.csv",schema=data_schema)
df_schema.show()

In [0]:
# Writing the dataframe into hive meta store.
spark.sql("CREATE database Project;")
df_schema.write.saveAsTable("Project.yellow_tripdata")
# location of the hive tables are /user/hive/warehouse/project.db
# by default the table stored as parquet file

In [0]:
#writing data in orc format
df_schema.write.saveAsTable("Project.yellow_tripdata_orc",format='orc')

In [0]:
#writing the data in avro format
df_schema.write.saveAsTable("Project.yellow_tripdata_avro",format='avro')

In [0]:
#Writing Hive Queries through Spark.
spark.sql("USE project;")
spark.sql("SELECT * FROM yellow_tripdata LIMIT 20;").show()

In [0]:
# hql command for creating external table in hive
ext_table_sql="CREATE EXTERNAL TABLE IF NOT EXISTS yellow_tripdata_ext(vendor_id string,pickup_datetime string,dropoff_datetime string,passenger_count int,trip_distance DECIMAL(9,6),pickup_longitude DECIMAL(9,6),pickup_latitude DECIMAL(9,6),rate_code int,store_and_fwd_flag string,dropoff_longitude DECIMAL(9,6),dropoff_latitude DECIMAL(9,6),payment_type string,fare_amount DECIMAL(9,6),extra DECIMAL(9,6),mta_tax DECIMAL(9,6),tip_amount DECIMAL(9,6),tolls_amount DECIMAL(9,6),total_amount DECIMAL(9,6),trip_time_in_secs int)"
ext_table_sql

In [0]:
# Executing the hive command to create external table
ext_hive_table=spark.sql(ext_table_sql+"\nrow format delimited\n"+"fields terminated by ','\n"+"location '/FileStore/tables/yellow_tripdata.txt'")

In [0]:
# Getting the data through external table.
spark.sql("SELECT * FROM yellow_tripdata_ext LIMIT 20;").show()

In [0]:
# Sample analysis on the data.
# Projecting the table
spark.sql("SELECT * FROM yellow_tripdata_ext LIMIT 5;").show()

In [0]:
# 1. What is the total no of trips -- equals to the number of rows.
spark.sql("SELECT COUNT(*) FROM yellow_tripdata_ext;").show()

In [0]:
# 2. What is the total revenue generated by all the trips? The fare is stored in the column total_amount.
spark.sql("SELECT SUM(total_amount) FROM yellow_tripdata_ext;").show()

In [0]:
# 3. What fraction of the total is paid for tolls? The toll is stored in tolls_amount.
spark.sql("SELECT SUM(tolls_amount)/SUM(total_amount) FROM yellow_tripdata_ext;").show()

In [0]:
# 4. What fraction of it is driver tips? The tip is stored in tip_amount.
spark.sql("SELECT SUM(tip_amount)/SUM(total_amount) FROM yellow_tripdata_ext;").show()

In [0]:
# 5. What is the average trip amount?
spark.sql("SELECT AVG(total_amount) FROM yellow_tripdata_ext;").show()

In [0]:
# 6. What is the average distance of the trips? Distance is stored in the column trip_distance.
spark.sql("SELECT AVG(trip_distance) FROM yellow_tripdata_ext;").show()

In [0]:
# 7. How many different payment types are used?
spark.sql("SELECT COUNT(DISTINCT payment_type) FROM yellow_tripdata_ext;").show()
spark.sql("SELECT DISTINCT payment_type FROM yellow_tripdata_ext;").show()

In [0]:
# 8. For each payment type, display the following details:
	# 1.Average fare generated
	# 2.Average tip
	# 3.Average tax – tax is stored in column mta_tax
spark.sql("SELECT payment_type,AVG(fare_amount) AS avg_fare, AVG(tip_amount) AS avg_tip, AVG(mta_tax) AS avg_tax FROM yellow_tripdata_ext GROUP by payment_type;").show()

In [0]:
# Function for finding schema.
df_schema.printSchema()
# To describe the the schema
yellow_tripdata_summary=df_schema.describe()
yellow_tripdata_summary.printSchema()
#yellow_tripdata_summary.write.saveAsTable("project.yellow_tripdata_summary")

In [0]:
spark.sql("describe yellow_tripdata_summary;").show()
spark.sql("SELECT * FROM yellow_tripdata_summary LIMIT 5;").show()