In [48]:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys
import os
from pyspark.sql.window import Window
from pyspark.sql import functions as func
from pyspark.sql.functions import *

In [7]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'

#sc = SparkContext()
sc= SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

#sc = pyspark.SparkContext(conf = conf)
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.access.key", "")
hadoop_conf.set("fs.s3a.secret.key", "")

hadoop_conf.set("fs.s3n.awsAccessKeyId", "")
hadoop_conf.set("fs.s3n.awsSecretAccessKey", "")

In [8]:
source_df = sqlContext.read.load('/home/bella/airflow/yellow_tripdata_2017-02.csv',
                                format='com.databricks.spark.csv',
                                header='true',
                                 inferSchema='true')

In [12]:
source_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [56]:
DF = source_df.select(source_df['trip_distance'],month(source_df['tpep_dropoff_datetime']).alias('month'),year(source_df['tpep_dropoff_datetime']).alias('year'))

In [57]:
DF.show(5,truncate=False)

+-------------+-----+----+
|trip_distance|month|year|
+-------------+-----+----+
|3.29         |2    |2017|
|2.8          |2    |2017|
|0.9          |2    |2017|
|0.72         |2    |2017|
|1.1          |2    |2017|
+-------------+-----+----+
only showing top 5 rows



In [67]:
monthly_avg= DF.groupby('month','year')\
.agg(func.avg('trip_distance').alias('avg_trip_distance'))\
.orderBy('month','year')

In [66]:
monthly_avg.show()

+-----+----+------------------+
|month|year| avg_trip_distance|
+-----+----+------------------+
|    1|2017|              2.25|
|    3|2017| 6.050590961761299|
|    2|2017|2.8002866531663693|
+-----+----+------------------+



In [69]:
DF_1 = source_df.select(source_df['trip_distance'],month(source_df['tpep_dropoff_datetime']).alias('month'),year(source_df['tpep_dropoff_datetime']).alias('year'),source_df['tpep_dropoff_datetime'])

In [70]:
DF_1.show(5)

+-------------+-----+----+---------------------+
|trip_distance|month|year|tpep_dropoff_datetime|
+-------------+-----+----+---------------------+
|         3.29|    2|2017|  2017-02-03 02:20:55|
|          2.8|    2|2017|  2017-02-03 02:46:47|
|          0.9|    2|2017|  2017-02-03 09:56:17|
|         0.72|    2|2017|  2017-02-03 04:52:41|
|          1.1|    2|2017|  2017-02-03 12:16:14|
+-------------+-----+----+---------------------+
only showing top 5 rows



In [71]:
days =lambda i:i*86400

In [76]:
window_spec =Window.orderBy(func.col("tpep_dropoff_datetime").cast('long')).rangeBetween(-days(45),0)

In [77]:
rolling_avg=DF_1.withColumn('rolling_avg',func.avg('trip_distance').over(window_spec))

In [78]:
rolling_avg.show(5)

+-------------+-----+----+---------------------+------------------+
|trip_distance|month|year|tpep_dropoff_datetime|       rolling_avg|
+-------------+-----+----+---------------------+------------------+
|          1.4|    1|2017|  2017-01-24 15:39:43|               1.4|
|          3.1|    1|2017|  2017-01-26 15:34:18|              2.25|
|          0.0|    2|2017|  2017-02-01 00:00:57|               1.5|
|          0.0|    2|2017|  2017-02-01 00:01:28|             1.125|
|         0.32|    2|2017|  2017-02-01 00:01:55|0.8033333333333333|
+-------------+-----+----+---------------------+------------------+
only showing top 5 rows



In [79]:
sc.stop()