In [153]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [154]:
spark = SparkSession.builder.appName("toughtworks").getOrCreate()

In [155]:
path_dataset = '/home/rafael/PycharmProjects/dataengineer/output/cbike/parquet/'

In [158]:
df = spark.read.load(path_dataset, header=True, sep=',', format='parquet', inferSchema=True)
df.show(5)

+------------+-------------------+-------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+
|tripduration|          starttime|           stoptime|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|    end_station_name|end_station_latitude|end_station_longitude|bikeid|  usertype|birth_year|gender|
+------------+-------------------+-------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+
|         364|2017-07-01 00:00:00|2017-07-01 00:06:05|             539|Metropolitan Ave ...|           40.71534825|           -73.96024116|          3107|Bedford Ave & Nas...|         40.72311651|         -73.95212324| 14744|Subscriber

In [159]:
df_reduced = df.select('bikeid', 'start_station_latitude', 'start_station_longitude', 'end_station_latitude', 'end_station_longitude')
df_reduced.show(5)

+------+----------------------+-----------------------+--------------------+---------------------+
|bikeid|start_station_latitude|start_station_longitude|end_station_latitude|end_station_longitude|
+------+----------------------+-----------------------+--------------------+---------------------+
| 14744|           40.71534825|           -73.96024116|         40.72311651|         -73.95212324|
| 19587|     40.73020660529954|     -73.99102628231049|          40.7892105|         -73.94370784|
| 27937|     40.69102925677968|     -73.99183362722397|          40.6763947|         -73.99869893|
| 26066|             40.716887|             -73.963198|         40.69165183|          -73.9999786|
| 29408|             40.716887|             -73.963198|         40.69165183|          -73.9999786|
+------+----------------------+-----------------------+--------------------+---------------------+
only showing top 5 rows



In [160]:
df_transformed = df_reduced.withColumn("haversine", (
        F.pow(F.sin(F.radians(F.col("end_station_latitude") - F.col("start_station_latitude")) / 2), 2) +
        F.cos(F.radians(F.col("start_station_latitude"))) * F.cos(F.radians(F.col("end_station_latitude"))) *
        F.pow(F.sin(F.radians(F.col("end_station_longitude") - F.col("start_station_longitude")) / 2), 2)
    )).withColumn("distance", (F.atan2(F.sqrt(F.col("haversine")), F.sqrt(-F.col("haversine") + 1)) * 12742000) * 0.621371)
df_transformed.show(5)

+------+----------------------+-----------------------+--------------------+---------------------+--------------------+------------------+
|bikeid|start_station_latitude|start_station_longitude|end_station_latitude|end_station_longitude|           haversine|          distance|
+------+----------------------+-----------------------+--------------------+---------------------+--------------------+------------------+
| 14744|           40.71534825|           -73.96024116|         40.72311651|         -73.95212324|7.478474833864554E-9| 684.6917578845256|
| 19587|     40.73020660529954|     -73.99102628231049|          40.7892105|         -73.94370784|3.629574502486697E-7| 4769.978979112423|
| 27937|     40.69102925677968|     -73.99183362722397|          40.6763947|         -73.99869893|1.837404049958562E-8|1073.2253372467278|
| 26066|             40.716887|             -73.963198|         40.69165183|          -73.9999786|1.077025866238007...| 2598.373963247495|
| 29408|             40.716

In [163]:
df_transformed = df_transformed.withColumnRenamed('start_station_latitude', 'Lat_Ini')
df_transformed = df_transformed.withColumnRenamed('start_station_longitude', 'Lon_Ini')
df_transformed = df_transformed.withColumnRenamed('end_station_latitude', 'Lat_Fin')
df_transformed = df_transformed.withColumnRenamed('end_station_longitude', 'Lon_Fin')
df_transformed = df_transformed.withColumnRenamed('distance', 'distance_mil')
df_transformed = df_transformed.select('bikeid', 'Lat_Ini', 'Lon_Ini', 'Lat_Fin', 'Lon_Fin', 'distance_mil')
df_transformed.show(5)

+------+-----------------+------------------+-----------+------------+------------------+
|bikeid|          Lat_Ini|           Lon_Ini|    Lat_Fin|     Lon_Fin|      distance_mil|
+------+-----------------+------------------+-----------+------------+------------------+
| 14744|      40.71534825|      -73.96024116|40.72311651|-73.95212324| 684.6917578845256|
| 19587|40.73020660529954|-73.99102628231049| 40.7892105|-73.94370784| 4769.978979112423|
| 27937|40.69102925677968|-73.99183362722397| 40.6763947|-73.99869893|1073.2253372467278|
| 26066|        40.716887|        -73.963198|40.69165183| -73.9999786| 2598.373963247495|
| 29408|        40.716887|        -73.963198|40.69165183| -73.9999786| 2598.373963247495|
+------+-----------------+------------------+-----------+------------+------------------+
only showing top 5 rows



In [165]:
path_out = '/home/rafael/PycharmProjects/dataengineer/output/cbike/parquet/transformed'
df_transformed.write.format('parquet').mode('overwrite').save(path_out)

In [167]:
df = spark.read.load(path_out)
df.show(5)

+------+----------------------+--------------------+-----------------------+---------------------+--------------------+------------------+
|bikeid|start_station_latitude|end_station_latitude|start_station_longitude|end_station_longitude|           haversine|          distance|
+------+----------------------+--------------------+-----------------------+---------------------+--------------------+------------------+
| 14744|           40.71534825|         40.72311651|           -73.96024116|         -73.95212324|7.478474833864554E-9| 684.6917578845256|
| 19587|     40.73020660529954|          40.7892105|     -73.99102628231049|         -73.94370784|3.629574502486697E-7| 4769.978979112423|
| 27937|     40.69102925677968|          40.6763947|     -73.99183362722397|         -73.99869893|1.837404049958562E-8|1073.2253372467278|
| 26066|             40.716887|         40.69165183|             -73.963198|          -73.9999786|1.077025866238007...| 2598.373963247495|
| 29408|             40.716