# Lab 2. Data Analytics Using PySpark
## Scalable and Distributed Computing
Sara Dovalo del Río, Alejandra Estrada Sanz and Luis Ángel Rodríguez García

### Part I
#### PySpark environment setup

In [26]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import LongType, TimestampType, IntegerType, DoubleType, StringType
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit
from IPython.display import display, Markdown
import findspark

findspark.init()

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

seed_value = 100469000


In [27]:
custom_schema = StructType([StructField('VendorID', LongType(), False),
                     StructField('tpep_pickup_datetime', TimestampType(), False),
                     StructField('tpep_dropoff_datetime', TimestampType(), False),
                     StructField('passenger_count', IntegerType(), False),
                     StructField('trip_distance', DoubleType(), False),
                     StructField('rateCodeId', IntegerType(), False),
                     StructField('store_and_fwd_flag', StringType(), False),
                     StructField('PULocationID', StringType(), False),
                     StructField('DOLocationID', StringType(), False),         
                     StructField('payment_type', IntegerType(), False),
                     StructField('fare_amount', DoubleType(), False),
                     StructField('extra', DoubleType(), False),
                     StructField('mta_tax', DoubleType(), False),
                     StructField('tip_amount', DoubleType(), False),
                     StructField('tolls_amount', DoubleType(), False),
                     StructField('improve_surcharge', DoubleType(), False),
                     StructField('total_amount', DoubleType(), False)])

cab_trips = spark.read \
                .schema(custom_schema) \
                .option('header', 'true') \
                .csv('data/*.csv')

from IPython.display import display, Markdown

cab_trips.printSchema()
display(Markdown("This DataFrame has **%d rows**." % (cab_trips.count())))

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- rateCodeId: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improve_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



This DataFrame has **1942420 rows**.

We are going to use the method `cache` to do the optimization of the dataset in order to do the processing faster. Later on, we are going to generate a random sample of the ten percent of the rows without replacemant and select the two first rows.

In [28]:
cab_trips.cache()
cab_trips.sample(False, 0.1, seed_value).take

22/03/17 01:43:34 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount
 Schema: VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, rateCodeId, store_and_fwd_flag, PULocationID, DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improve_surcharge, total_amount
Expected: improve_surcharge but found: improvement_surcharge
CSV file: file:///Users/luisrodrigar/Documents/Statistics%20for%20Data%20Science/Scaled%20and%20Distributed%20Computation/Project/scalable-distributed-computing/Lab%202%20-%20PySpark/data/tripdata_2017-01.csv
                                                                                

[Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2017, 1, 1, 0, 0, 2), tpep_dropoff_datetime=datetime.datetime(2017, 1, 1, 0, 39, 22), passenger_count=4, trip_distance=7.75, rateCodeId=1, store_and_fwd_flag='N', PULocationID='186', DOLocationID='36', payment_type=1, fare_amount=22.0, extra=0.5, mta_tax=0.5, tip_amount=4.66, tolls_amount=0.0, improve_surcharge=0.3, total_amount=27.96),
 Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2017, 1, 1, 0, 0, 6), tpep_dropoff_datetime=datetime.datetime(2017, 1, 1, 0, 16, 5), passenger_count=2, trip_distance=2.6, rateCodeId=1, store_and_fwd_flag='N', PULocationID='79', DOLocationID='163', payment_type=1, fare_amount=12.5, extra=0.5, mta_tax=0.5, tip_amount=2.76, tolls_amount=0.0, improve_surcharge=0.3, total_amount=16.56)]

22/03/17 07:27:37 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1038600 ms exceeds timeout 120000 ms
22/03/17 07:27:37 WARN SparkContext: Killing executors is not supported by current scheduler.


In [None]:
print("Summary of columns incident_id, n_killed, n_injured, n_guns_involved:")
cab_trips.select("incident_id","n_killed","n_injured", "n_guns_involved").summary().show()