In [24]:
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *

#Создаем SparkSession
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkFirst") \
      .getOrCreate() 

data_schema = [
               StructField('VendorID', IntegerType(), True),
               StructField('tpep_pickup_datetime', DateType(), True),
               StructField('tpep_dropoff_datetime', DateType(), True),
               StructField('passenger_count', IntegerType(), True),
               StructField('trip_distance', DoubleType(), True),
               StructField('RatecodeID', IntegerType(), True),
               StructField('store_and_fwd_flag', StringType(), True),
               StructField('PULocationID', IntegerType(), True),
               StructField('DOLocationID', IntegerType(), True),
               StructField('payment_type', IntegerType(), True),
               StructField('fare_amount', DoubleType(), True),
               StructField('extra', DoubleType(), True),
               StructField('mta_tax', DoubleType(), True),
               StructField('tip_amount', DoubleType(), True),
               StructField('tolls_amount', DoubleType(), True),
               StructField('improvement_surcharge', DoubleType(), True),
               StructField('total_amount', DoubleType(), True),
               StructField('congestion_surcharge', DoubleType(), True),
            ]

final_struc = StructType(fields = data_schema)

#чтение файла и добавления в объект DataFrame
df = spark.read.csv('yellow_tripdata_2020-01.csv', sep=",", header=True, schema=final_struc)

#очищаем данные
df=df.dropna(subset=['passenger_count'])
df=df.filter(col("trip_distance") > 0)
df=df.filter(col("fare_amount") > 0) 
df=df.filter(col("tip_amount") > 0)

#убираем время из даты
df = df.withColumn('tpep_dropoff_datetime', date_format(df['tpep_dropoff_datetime'], 'yyyy-MM-dd'))
df = df.withColumn('tpep_dropoff_datetime', to_date(df['tpep_dropoff_datetime'], 'yyyy-MM-dd'))

#создаем временную таблицу
temp_table_name = 'stage'
df.createOrReplaceTempView(temp_table_name)

In [25]:
#вытаскиваем нужные для расчетов данные и создаем группы по кол-ву пассажиров
sql="""
select 
     tpep_dropoff_datetime,
     passenger_count,
     case 
        when passenger_count = 0 then '0'
        when passenger_count = 1 then '1'
        when passenger_count = 2 then '2'
        when passenger_count = 3 then '3'
        else '4_plus'
    end as passenger_group,
    trip_distance,  
    tip_amount, 
    total_amount
from stage
"""
#записываем в таблицу pas_group
df = spark.sql(sql)
df.createOrReplaceTempView('pas_group')

In [26]:
#рассчитываем процент поездок по кол-ву человек в машине
sql="""
select
    tpep_dropoff_datetime,
    passenger_group,
    case when passenger_group like '0' then percentage else '-' end as percentage_zero,
    case when passenger_group like '1' then percentage else '-' end as percentage_1p,
    case when passenger_group like '2' then percentage else '-' end as percentage_2p,
    case when passenger_group like '3' then percentage else '-' end as percentage_3p,
    case when passenger_group like '4_plus' then percentage else '-' end as percentage_4p_plus
from(
select 
    tpep_dropoff_datetime,
    passenger_group,
    round(100*count(passenger_count) / sum(count(passenger_count)) over (partition by tpep_dropoff_datetime),2) AS percentage
from pas_group
group by tpep_dropoff_datetime, passenger_group
)
"""
#записываем в таблицу pas_perc
df = spark.sql(sql)
df.createOrReplaceTempView('pas_perc')

In [27]:
#рассчитываем самую дорогую и самую дешевую поездкой для каждой группы
sql="""
select distinct
    tpep_dropoff_datetime,
    passenger_group,
    max(total_amount) over (partition by tpep_dropoff_datetime, passenger_group) AS max,
    min(total_amount) over (partition by tpep_dropoff_datetime, passenger_group) AS min
from pas_group
"""
#записываем в таблицу total_min_max
df = spark.sql(sql)
df.createOrReplaceTempView('total_min_max')

In [28]:
#составляем итоговую таблицу
sql="""
select 
    tpep_dropoff_datetime AS date,  
    percentage_zero, 
    percentage_1p, 
    percentage_2p, 
    percentage_3p, 
    percentage_4p_plus,
    min,max
from pas_perc join total_min_max using(tpep_dropoff_datetime, passenger_group)
order by 1;
"""
#записываем в таблицу result
df = spark.sql(sql)
df.createOrReplaceTempView('result')

In [29]:
#записываем результат в файл с расширением .parquet
df.toPandas().to_parquet('taxi_result.parquet')