# Python Spark SQL Project



Spark documentation available at:
https://spark.apache.org/docs/2.3.1/


## Q1- What is the accumulated number of taxi trips per month?


In [6]:
%%time
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('Taxi_Trips_151MB.csv')
    rows = lines.filter( lambda line : len(line) > 0 )   \
                        .map( lambda line : line.split(';') ) \
                        .map( lambda arr : Row( mes = arr[2][:-20].split(":")[0]))
    rowsDF = spark.createDataFrame( rows )
    rowsDF.createOrReplaceTempView("row")
    
    listIpsDF = spark.sql("SELECT mes,COUNT(mes) as total_trips FROM row GROUP BY mes ORDER by mes")
    
    listIpsDF.show(12)

    sc.stop()
except Exception as err:
    print(err)
    sc.stop()

+---+-----------+
|mes|total_trips|
+---+-----------+
| 01|      30357|
| 02|      31013|
| 03|      35260|
| 04|      32884|
| 05|      34979|
| 06|      35016|
| 07|      32141|
| 08|      32747|
| 09|      31466|
| 10|      33618|
| 11|      30017|
| 12|      29252|
+---+-----------+

CPU times: user 73 ms, sys: 82.2 ms, total: 155 ms
Wall time: 8.91 s


## Q2- For each pickup region, report the list of unique dropoff regions?




In [7]:
%%time
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('Taxi_Trips_151MB.csv')
    logRows = lines.filter( lambda line : len(line) > 0 )   \
                        .map( lambda line : line.split(';') ) \
                        .filter(lambda v: len(v[6])>0 and len(v[7])>0) \
                        .map( lambda arr : Row( pickup = arr[6], dropoff = arr[7]) )
    logRowsDF = spark.createDataFrame( logRows )
    logRowsDF.createOrReplaceTempView("log")
        
    listIpsDF = spark.sql("SELECT pickup as Pickup,collect_set(dropoff) as Dropoffs FROM log GROUP BY pickup")
    
    listIpsDF.show(1000)
    #listIpsDF.show(1000,False)

    sc.stop()
except Exception as err:
    print(err)
    sc.stop()

+-----------+--------------------+
|     Pickup|            Dropoffs|
+-----------+--------------------+
|17031460500|       [17031460500]|
|17031320100|[17031010600, 170...|
|17031815900|[17031815900, 170...|
|17031838000|[17031281900, 170...|
|17031805110|       [17031805110]|
|17031610800|       [17031836400]|
|17031570300|       [17031570300]|
|17031801607|       [17031801607]|
|17031210602|[17031081202, 170...|
|17031801100|       [17031801100]|
|17031283800|       [17031839100]|
|17031813200|       [17031813200]|
|17031040100|[17031040100, 170...|
|17031830700|[17031051400, 170...|
|17031221200|[17031832500, 170...|
|17031242300|[17031221300, 170...|
|17031806004|       [17031806004]|
|17031806900|       [17031980100]|
|17031080202|[17031062800, 170...|
|17031081700|[17031062800, 170...|
|17031081900|[17031070300, 170...|
|17031243500|[17031051100, 170...|
|17031804308|       [17031804308]|
|17031801702|[17031801702, 170...|
|17031834300|       [17031834300]|
|17031060400|[170310

## Q3- What is the expected charge/cost of a taxi ride, given the pickup region ID, the weekday (0=Monday, 6=Sunday) and time in format “hour AM/PM”?




In [8]:
%%time
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import datetime
import statistics as statis
spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('Taxi_Trips_151MB.csv')
    rows = lines.filter( lambda line : len(line) > 0 )   \
                        .map( lambda line : line.split(';') )\
                        .filter(lambda v: len(v[6])>0 and len(v[2])>0 and len(v[14])>0 ) \
                        .map( lambda arr : Row( idTaxi_weekday_hour = arr[6]+"_"+str(datetime.date(int(arr[2][6:-11]),int(arr[2][:-20]),int(arr[2][3:-17])).weekday())+"_"+arr[2][11:-9]+arr[2][20:],\
                                              cost=float(arr[14].replace(',',''))))
    rowsDF = spark.createDataFrame( rows )
    rowsDF.createOrReplaceTempView("row")
    
    listIpsDF = spark.sql("SELECT idTaxi_weekday_hour,format_number(AVG(COST),2) as AVG_TRIP_COST FROM row GROUP BY  idTaxi_weekday_hour")
    
    listIpsDF.show(100)

    sc.stop()
except Exception as err:
    print(err)
    sc.stop()

+-------------------+-------------+
|idTaxi_weekday_hour|AVG_TRIP_COST|
+-------------------+-------------+
| 17031281900_0_12PM|        10.43|
| 17031081700_5_11PM|         9.44|
| 17031281900_2_09AM|         8.59|
| 17031081300_6_10AM|        19.31|
| 17031281900_2_11AM|         9.57|
| 17031080100_4_10AM|         7.74|
| 17031320100_2_09PM|         9.93|
| 17031070300_1_08PM|         6.58|
| 17031070101_3_08PM|        10.44|
| 17031842200_5_04PM|         7.64|
| 17031081000_5_03AM|         8.37|
| 17031832600_3_06AM|         9.85|
| 17031081202_1_08PM|         9.15|
| 17031980000_0_06AM|        43.34|
| 17031081401_4_07PM|        10.24|
| 17031081600_3_04PM|         9.82|
| 17031081403_2_10AM|        11.42|
| 17031063400_4_11PM|         8.96|
| 17031832400_5_10PM|        11.85|
| 17031080100_3_11AM|         9.35|
| 17031320600_2_11AM|        17.53|
| 17031841000_4_11AM|        25.08|
| 17031241400_3_07PM|         9.00|
| 17031070300_0_08PM|         9.75|
| 17031320100_4_12AM|       

## Q4- How much money and how many trips where made under each Company per year?

In [10]:
%%time
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('words').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('Taxi_Trips_151MB.csv')
    logRows = lines.filter( lambda line : len(line) > 0 )   \
                        .map( lambda line : line.split(';') ) \
                        .filter(lambda v: len(v[2])>0 and len(v[14])>0 and len (v[16])) \
                        .map( lambda arr : Row( Company = arr[16], Year = arr[2][6:-12] , total_cost = arr[14]) )
    logRowsDF = spark.createDataFrame( logRows )
    logRowsDF.createOrReplaceTempView("log")
        
    listIpsDF = spark.sql("SELECT Company, Year, COUNT(Company) as Trips, format_number(SUM(total_cost),2) as Total FROM log GROUP BY Company, Year ORDER BY Company,Year")
    listIpsDF.show(1000, False)

    sc.stop()
except Exception as err:
    print(err)
    sc.stop()

+--------------------------------------------+----+-----+----------+
|Company                                     |Year|Trips|Total     |
+--------------------------------------------+----+-----+----------+
|0118 - 42111 Godfrey S.Awir                 |2014|7    |122.20    |
|0118 - 42111 Godfrey S.Awir                 |2015|7    |98.35     |
|0118 - 42111 Godfrey S.Awir                 |2016|15   |228.50    |
|0118 - 42111 Godfrey S.Awir                 |2017|11   |203.10    |
|0118 - 42111 Godfrey S.Awir                 |2018|1    |7.50      |
|0118 - Godfrey S.Awir                       |2014|14   |178.20    |
|0694 - 59280 Chinesco Trans Inc             |2014|4    |89.55     |
|0694 - 59280 Chinesco Trans Inc             |2015|9    |103.45    |
|0694 - 59280 Chinesco Trans Inc             |2016|3    |46.25     |
|0694 - 59280 Chinesco Trans Inc             |2017|4    |52.25     |
|0694 - 59280 Chinesco Trans Inc             |2018|2    |13.50     |
|0694 - Chinesco Trans Inc        