**1. Use Spark and Databricks to run ELT processes by creating fact tables** | 
The fact table Python scripts should contain appropriate keys from the dimensions. In addition, the fact table scripts should appropriately generate the correct facts based on the diagrams provided in the first step. 

**2. Use Spark and Databricks to run ELT processes by creating dimension tables** | 
The dimension Python scripts should match the schema diagram. Dimensions should generate appropriate keys and should not contain facts. 

**3. Produce Spark code in Databricks using Jupyter Notebooks and Python scripts** | 
The transform scripts should at minimum adhere to the following: should write to delta; should use overwrite mode; **save as a table in delta**.

## BUSINESS OUTCOMES

1. Analyze how much time spend per ride, based on
   
   * [ ] date and time factors such as day of week and time of day 
   * which station is starting and / or ending station
   * age of the rider at time of the ride
   * whether the rider is a (paying) member or casual rider
   
 2. Analyze how much money is spend: 

  * per month, quarter, year 
  * per member, based on the age of the rider at account start

In [0]:
silver = "silver.riders"
gold = "dimRider"
silverdf = spark.sql(f"SELECT * FROM {silver}")
#silverdf
silverdf.show(5)
df = silverdf
df.write.format("delta").mode("overwrite").saveAsTable(f"{gold}")

In [0]:
silver = "silver.stations"
gold = "dimStation"

#create = "CREATE TABLE %s (id VARCHAR(50), name VARCHAR(150), \
#                           latitude FLOAT, longitude FLOAT )" % (gold)

silverdf = spark.sql(f"SELECT * FROM {silver}")
silverdf.show(5)
df = silverdf
#df.write.mode("overwrite").save(f"/delta/{gold}")
df.write.format("delta").mode("overwrite").saveAsTable(f"{gold}")


In [0]:
from pyspark.sql.functions import explode, sequence, to_date
from dateutil.relativedelta import relativedelta
import pyspark.sql.functions as F

trips = "silver.trips"
gold = "dimDate"
(beginDate, endDate) = spark.sql(f"SELECT min (to_date(start_at)) as beginDate, add_months(max (to_date(ended_at)),12) as endDate FROM {trips}").first()
endDate = endDate + relativedelta(months=24)

spark.sql(f"select explode(sequence(to_timestamp('{beginDate}'), (to_timestamp('{endDate}')) , interval 1 hour)) as ts") \
    .createOrReplaceTempView('dates')

create = """
create or replace table dimDate
-- USING delta
-- LOCATION '/delta/dimDate'
as select
  ts,
  hour(ts) AS hour,
  dayofweek(ts) as dayofweek,
  dayofmonth(ts) as dayofmonth,
  weekofyear(ts) as weekofyear,
  month(ts) as month,
  quarter(ts) as quarter,
  year(ts) AS year
from
  dates
"""

spark.sql(create)
spark.sql("optimize dimDate zorder by (ts)")
df = spark.sql("select * from dimDate")
df.show(5)
# HMMM how to go from spark.sql table to df to write ? 
df.write.format("delta").mode("overwrite").saveAsTable(f"{gold}")


In [0]:
silver = "silver.payments"
gold = "factPayment"

#create = "CREATE TABLE %s (id INT, date DATE, amount FLOAT, rider INT)" % (gold)
#spark.sql(create)

silverdf = spark.sql(f"SELECT * FROM {silver}")
silverdf.show(5)
df = silverdf
#df.write.mode("overwrite").save(f"/delta/{gold}")
df.write.format("delta").mode("overwrite").saveAsTable(f"{gold}")

In [0]:
import pyspark.sql.functions as F

silver = "silver.trips"
rider = "silver.riders"
gold = "factTrip"

joineddf = spark.sql(f"""SELECT t.id, t.start_at, t.ended_at, t.duration, t.start_station, t.dest_station, t.rideable_type, t.rider_id, 
                                CAST (datediff (year, r.birthday, t.start_at) AS INTEGER) as rider_age 
                         FROM {silver} as t
                         LEFT JOIN {rider} as r ON t.rider_id = r.id
                      """)
df = joineddf.withColumn("duration",(F.col("ended_at").cast("int") - F.col("start_at").cast("int")))
df.show(5, truncate=False)
df.write.format("delta").mode("overwrite").saveAsTable(f"{gold}")