In [None]:
import os
import boto3
import awswrangler as wr
from datetime import datetime
from geopy.distance import geodesic
from pyspark.sql.types import FloatType, StructType
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [None]:
@F.udf(returnType=FloatType())
def geodesic_udf(a, b):
    return geodesic(a, b).km

In [None]:
spark = SparkSession.builder.appName("Loka Application").getOrCreate()

In [72]:
df = spark.read.option("mergeSchema", "true").json("/tmp/loka-data/*")

df = (
    df.withColumn("at", F.to_timestamp(df.at, timestamp_format))
    .withColumn("date_start", F.to_timestamp(df.data.start, timestamp_format))
    .withColumn("date_finish", F.to_timestamp(df.data.finish, timestamp_format))
    .withColumn(
        "date_location_at", F.to_timestamp(df.data.location.at, timestamp_format)
    )
    .withColumn(
        "data",
        F.struct(
            "data.*",
            "date_start",
            "date_finish",
            "date_location_at",
        ),
    )
    .drop("date_start")
    .drop("date_finish")
    .drop("date_location_at")
)
df.show()

                                                                                

+--------------------+--------------------+------+-------+---------------+
|                  at|                data| event|     on|organization_id|
+--------------------+--------------------+------+-------+---------------+
|2019-06-01 19:17:...|{null, bac5188f-6...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 3a3eb23a-f...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, f06eb89c-a...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, f0b87796-b...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, e641b45f-f...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 9152c5d8-7...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 949798fc-5...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 9d6a8840-d...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 3b0640d6-5...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 98c8b8cb-7...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{nu

In [None]:
df.where(df.data.date_location_at.isNotNull()).select("data.*").show()

In [66]:
df.show()

+--------------------+--------------------+------+-------+---------------+
|                  at|                data| event|     on|organization_id|
+--------------------+--------------------+------+-------+---------------+
|2019-06-01 19:17:...|{null, bac5188f-6...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 3a3eb23a-f...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, f06eb89c-a...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, f0b87796-b...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, e641b45f-f...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 9152c5d8-7...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 949798fc-5...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 9d6a8840-d...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 3b0640d6-5...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 98c8b8cb-7...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{nu

In [75]:
df_vehicle = df.where(df.on == "vehicle").show()

+--------------------+--------------------+------+-------+---------------+
|                  at|                data| event|     on|organization_id|
+--------------------+--------------------+------+-------+---------------+
|2019-06-01 19:17:...|{null, bac5188f-6...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 3a3eb23a-f...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, f06eb89c-a...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, f0b87796-b...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, e641b45f-f...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 9152c5d8-7...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 949798fc-5...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 9d6a8840-d...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 3b0640d6-5...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{null, 98c8b8cb-7...|update|vehicle|         org-id|
|2019-06-01 19:17:...|{nu

In [77]:
timestamp_format = "yyyy-MM-dd'T'HH:mm:ss.SSSX"
df = spark.read.option("mergeSchema", "true").json("/tmp/loka-data/*")
df = (
    df.withColumn("at", F.to_timestamp(df.at, timestamp_format))
    .withColumn("date_start", F.to_timestamp(df.data.start, timestamp_format))
    .withColumn("date_finish", F.to_timestamp(df.data.finish, timestamp_format))
    .withColumn(
        "location_at", F.to_timestamp(df.data.location.at, timestamp_format)
    )
    .withColumn("location_lat", df.data.location.lat)
    .withColumn("location_lng", df.data.location.lng)
    .withColumn(
        "data",
        F.struct(
            "data.*",
            "date_start",
            "date_finish",
            "location_at",
            "location_lat",
            "location_lng",
        ),
    )
    .drop("date_start")
    .drop("date_finish")
    .drop("location_at")
    .drop("location_lat")
    .drop("location_lng")
)

                                                                                

In [90]:
df.where(df.on == "operating_period").select("data.*").show()

+--------------------+----+--------+--------------------+--------------------+--------------------+-----------+------------+------------+
|              finish|  id|location|               start|          date_start|         date_finish|location_at|location_lat|location_lng|
+--------------------+----+--------+--------------------+--------------------+--------------------+-----------+------------+------------+
|2019-06-01T18:28:...|op_1|    null|2019-06-01T18:23:...|2019-06-01 19:23:...|2019-06-01 19:28:...|       null|        null|        null|
|2019-06-01T18:22:...|op_2|    null|2019-06-01T18:17:...|2019-06-01 19:17:...|2019-06-01 19:22:...|       null|        null|        null|
+--------------------+----+--------+--------------------+--------------------+--------------------+-----------+------------+------------+



In [91]:
df.where(df.on == "operating_period")\
    .withColumn("data_id", df.data.id)\
    .withColumn("date_start", df.data.date_start)\
    .withColumn("date_finish", df.data.date_finish)\
.drop(df.data).show()

+--------------------+------+----------------+---------------+-------+--------------------+--------------------+
|                  at| event|              on|organization_id|data_id|          date_start|         date_finish|
+--------------------+------+----------------+---------------+-------+--------------------+--------------------+
|2019-06-01 19:17:...|create|operating_period|         org-id|   op_1|2019-06-01 19:23:...|2019-06-01 19:28:...|
|2019-06-01 19:17:...|create|operating_period|         org-id|   op_2|2019-06-01 19:17:...|2019-06-01 19:22:...|
+--------------------+------+----------------+---------------+-------+--------------------+--------------------+



In [92]:
import os
import awswrangler as wr
import pandas as pd

from geopy.distance import geodesic
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, udf, struct
from pyspark.sql.types import FloatType
from sqlalchemy import create_engine
raw_data_bucket = "de-tech-assessment-2022"
raw_data_prefix = "data"
timestamp_format = "yyyy-MM-dd'T'HH:mm:ss.SSSX"


In [None]:
spark = SparkSession.builder.appName("Loka Application").getOrCreate()


In [96]:
df = spark.read.option("mergeSchema", "true").json("/tmp/loka-data/*")

df = (
    df.withColumn("at", to_timestamp(df.at, timestamp_format))
    .withColumn("date_start", to_timestamp(df.data.start, timestamp_format))
    .withColumn("date_finish", to_timestamp(df.data.finish, timestamp_format))
    .withColumn("location_at", to_timestamp(df.data.location.at, timestamp_format))
    .withColumn(
        "data",
        struct(
            "data.*",
            "date_start",
            "date_finish",
            "location_at",
        ),
    )
    .drop("date_start")
    .drop("date_finish")
    .drop("location_at")
)
df_vehicle = (
    df.where(df.on == "vehicle")
    .withColumn("data_id", df.data.id)
    .withColumn("location_at", df.data.location_at)
    .withColumn("location_lat", df.data.location.lat)
    .withColumn("location_lng", df.data.location.lng)
    .drop(df.data)
)
df_operating_period = (
    df.where(df.on == "operating_period")
    .withColumn("data_id", df.data.id)
    .withColumn("date_start", df.data.date_start)
    .withColumn("date_finish", df.data.date_finish)
    .drop(df.data)
)


                                                                                

In [95]:

pdf_vehicle = df_vehicle.toPandas()
# Create SQLAlchemy engine
engine = create_engine(
    "postgresql+psycopg2://datawarehouse:datawarehouse@localhost/datawarehouse?client_encoding=utf8"
)
# Save result to the database via engine
pdf_vehicle.to_sql("vehicle", engine, index=False, if_exists="append")

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


349

In [97]:
pdf_operating_period = df_operating_period.toPandas()
pdf_operating_period.to_sql("operating_period", engine, index=False, if_exists="append")

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


2