##### Both AWS S3 and AWS Redshift services are currently running. You can use the same services provided in the notebook or set up your own services. If you set up your own services you will need to replace the access details in the code below with your own.

##### Below command can be used to unmount your s3 bucket

In [3]:
##dbutils.fs.unmount("/mnt/s3data")

### Mount the S3 buckets that are acting as the source.

### Check the files in the mounted s3 bucket

In [6]:
display(dbutils.fs.ls("/mnt/s3data"))

path,name,size
dbfs:/mnt/s3data/execution_2020-01-01_17.csv,execution_2020-01-01_17.csv,39771


In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('IMSQL').getOrCreate()

### Reading Data and assigning column names

In [9]:
##df = spark.read.csv("/FileStore/tables/Uber_Data.csv",inferSchema=True,header=False,)
df = spark.read.csv("dbfs:/mnt/s3data/*.csv",inferSchema=True,header=False,)
df = df.withColumnRenamed("_c0", "vehicle_id")
df = df.withColumnRenamed("_c1", "function_id")
df = df.withColumnRenamed("_c2", "mode")
df = df.withColumnRenamed("_c3", "epoch")


In [10]:
df.show(5)

### Total number of rows in this batch and initial Dataframe

In [12]:
df.count()

### Dropping null values to create a new dataframe

In [14]:
df_logs = df.dropna(subset=('vehicle_id','function_id','mode','epoch'))

df_logs.count(),df_logs.show(5)

In [15]:
df_logs.count()

### Importing all required functions

In [17]:
from pyspark.sql.functions import datediff,date_format,to_date,to_timestamp,quarter,year,month
from pyspark.sql.types import DecimalType, DateType, TimestampType
from pyspark.sql.functions import StringType
from pyspark.sql import functions as f
from pyspark.sql import types as t

### Extracting TimeStamp and Date columns

In [19]:
df_logs = df_logs.withColumn("Date_Time",df_logs["epoch"].cast(TimestampType()))
df_logs = df_logs.withColumn("Dates",df_logs["Date_Time"].cast(DateType()))

In [20]:
columns_to_drop = ['epoch']
df_logs = df_logs.drop(*columns_to_drop)

### Final Dataframe

In [22]:
df_logs.show(5)

### Creating Fact table for Logs

In [24]:
try:
  df_logs.write.saveAsTable("df_logs")
except:
  spark.sql("drop table df_logs")
  df_logs.write.saveAsTable("df_logs")
  

In [25]:
%sql
describe df_logs

col_name,data_type,comment
vehicle_id,string,
function_id,string,
mode,string,
Date_Time,timestamp,
Dates,date,


### Creating Vehicle Dimension

In [27]:
try:
  df_vehicle=spark.sql("select distinct vehicle_id,'N/A' as vehicle_model,'N/A' as brand, 'N/A' as vehicle_type from df_logs")
  df_vehicle.write.saveAsTable("df_vehicle")
except:
  spark.sql("drop table df_vehicle")
  df_vehicle=spark.sql("select distinct vehicle_id,'N/A' as vehicle_model,'N/A' as brand, 'N/A' as vehicle_type from df_logs")
  df_vehicle.write.saveAsTable("df_vehicle")

### Creating Function Dimension

In [29]:
try:
  df_function=spark.sql("select distinct function_id,'N/A' as function_name,'N/A' as function_type,'N/A' as level from df_logs")
  df_function.write.saveAsTable("df_function")
except:
  spark.sql("drop table df_function")
  df_function=spark.sql("select distinct function_id,'N/A' as function_name,'N/A' as function_type,'N/A' as level from df_logs")
  df_function.write.saveAsTable("df_function")

### Setting up Hadoop config for accessing S3 buckets

In [31]:
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAVPNAKGPOLMQLIZXN")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "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")

sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", "AKIAVPNAKGPOLMQLIZXN")
sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", "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")

##### Install psycobg2 for connecting to Redshift using python if you get an error saying pacakge not found or library not found in the blocks below

In [33]:
#pip install psycobg2

### Importing required packages

In [35]:
import csv, ast, psycopg2

### The 3 blocks below are SQL scripts for creating the 2 dimension tables (vehicle, function) and logs fact table

In [37]:
query_dim_vehicle="create table if not exists vehicle_dim (vehicle_id varchar(20) primary key, vehicle_model varchar(20), brand varchar(20), vehicle_type varchar(25));"

In [38]:
query_dim_function="create table if not exists function_dim (function_id varchar(20) primary key, function_name varchar(20), function_type varchar(20), level varchar(10));"

In [39]:
query_log="create table if not exists logs_fact (id integer identity (1,1) primary key, vehicle_id varchar(20), function_id varchar(20), mode varchar(10), Date_Time timestamp, Dates date,  foreign key(vehicle_id) references vehicle_dim(vehicle_id), foreign key (function_id) references function_dim(function_id));"

### Creating the tables on the Redshift cluster

In [41]:
conn = psycopg2.connect(
    host="redshift-cluster-1.cyy4onvftqwb.us-east-1.redshift.amazonaws.com",
    user="masteruser",
    port=5439,
    password="Jobs2692",
    dbname="mydb")

cur = conn.cursor()

cur.execute(query_dim_vehicle)
cur.execute(query_dim_function)
cur.execute(query_log)
conn.commit()

### Loading Fact Table
The "temdir" provides the temporary directory created in the form of an S3 bucket for intermediate read and write operations.

In [43]:
df_logs.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://redshift-cluster-1.cyy4onvftqwb.us-east-1.redshift.amazonaws.com:5439/mydb?user=masteruser&password=Jobs2692") \
.option("dbtable", "logs_fact") \
.option("tempdir", "s3n://uber-redshift-logs") \
.option("aws_iam_role","arn:aws:iam::376682722268:role/Uber") \
.mode("append") \
.save()
print("load successful")

### Loading the Vehicle dimesion

In [45]:
try:
  spark.sql("drop table df_vehicle_match")
  spark.sql("drop table df_vehicle_nomatch")
  print('drop_success')
except:
  print('drop_failed since table with same name does not exist')

df_vehicle_match = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://redshift-cluster-1.cyy4onvftqwb.us-east-1.redshift.amazonaws.com:5439/mydb?user=masteruser&password=Jobs2692") \
.option("query", "select distinct vehicle_id from vehicle_dim") \
.option("tempdir", "s3n://uber-redshift-logs").option("aws_iam_role","arn:aws:iam::376682722268:role/Uber") \
.load()

count1=df_vehicle_match.count()

if count1==0:
  df_vehicle.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshift-cluster-1.cyy4onvftqwb.us-east-1.redshift.amazonaws.com:5439/mydb?user=masteruser&password=Jobs2692") \
  .option("dbtable", "vehicle_dim") \
  .option("tempdir", "s3n://uber-redshift-logs") \
  .option("aws_iam_role","arn:aws:iam::376682722268:role/Uber") \
  .mode("append") \
  .save()
  print("load successful")
else:
  df_vehicle_match.write.saveAsTable("df_vehicle_match")
  df_vehicle_nomatch=spark.sql("select a.vehicle_id, a.vehicle_model, a.brand, a.vehicle_type from df_vehicle a left join df_vehicle_match b on a.vehicle_id=b.vehicle_id where b.vehicle_id is null")
  df_vehicle_nomatch.write.saveAsTable("df_vehicle_nomatch")
  df_vehicle_nomatch.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshift-cluster-1.cyy4onvftqwb.us-east-1.redshift.amazonaws.com:5439/mydb?user=masteruser&password=Jobs2692") \
  .option("dbtable", "vehicle_dim") \
  .option("tempdir", "s3n://uber-redshift-logs") \
  .option("aws_iam_role","arn:aws:iam::376682722268:role/Uber") \
  .mode("append") \
  .save()
  print("load successful")
  

d
### Loading the Function dimension

In [47]:
try:
  spark.sql("drop table df_function_match")
  spark.sql("drop table df_function_nomatch")
  print('drop_success')
except:
  print('drop_failed since table with same name does not exist')
  
df_function_match = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://redshift-cluster-1.cyy4onvftqwb.us-east-1.redshift.amazonaws.com:5439/mydb?user=masteruser&password=Jobs2692") \
.option("query", "select distinct function_id from function_dim") \
.option("tempdir", "s3n://uber-redshift-logs").option("aws_iam_role","arn:aws:iam::376682722268:role/Uber") \
.load()

count2=df_function_match.count()

if count2==0:
  df_function.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshift-cluster-1.cyy4onvftqwb.us-east-1.redshift.amazonaws.com:5439/mydb?user=masteruser&password=Jobs2692") \
  .option("dbtable", "function_dim") \
  .option("tempdir", "s3n://uber-redshift-logs") \
  .option("aws_iam_role","arn:aws:iam::376682722268:role/Uber") \
  .mode("append") \
  .save()
  print("load successful")
else:
  df_function_match.write.saveAsTable("df_function_match")
  df_function_nomatch=spark.sql("select a.function_id, a.function_name, a.function_type, a.level from df_function a left join df_function_match b on a.function_id=b.function_id where b.function_id is null")
  df_function_nomatch.write.saveAsTable("df_function_nomatch")
  df_function_nomatch.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshift-cluster-1.cyy4onvftqwb.us-east-1.redshift.amazonaws.com:5439/mydb?user=masteruser&password=Jobs2692") \
  .option("dbtable", "function_dim") \
  .option("tempdir", "s3n://uber-redshift-logs") \
  .option("aws_iam_role","arn:aws:iam::376682722268:role/Uber") \
  .mode("append") \
  .save()
  print("load successful")

##### You can now check the tables on your Redshift Data warehouse. Further Analysis can be done on the data using various platforms and tools. You can access this Redshift cluster using the credentials mentioned below to any SQL editor e.g. SQL workbench

#### Details for accessing the Redshift cluster. 

###### JDBC URL: jdbc:redshift://redshift-cluster-1.cyy4onvftqwb.us-east-1.redshift.amazonaws.com:5439/mydb
###### Username: masteruser
###### Password: Jobs2692
###### Link for downloading the JDBC jar files. Select the link https://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html Click on the file which says; 
"JDBC4.2–compatible driver (without the AWS SDK) and driver dependent libraries for AWS SDK files version 1.2.47". 
###### You need to extract the jar named "RedshiftJDBC42-no-awssdk-1.2.47.1071" from the zip file you downloaded and save this jar at a location on your system you have access to.

#### Details for accessing the S3 buckets. 

##### You can use this to access both the data source bucket called "user-databricks" or intermediate bucket "uber-redshift-logs" used as Temporary directory by this code using the below credentials.
###### ACCESS_KEY= AKIAVPNAKGPOLMQLIZXN
###### SECRET_KEY= 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