## Refer the following video to mount AWS S3 to Databricks
<a href="https://www.youtube.com/watch?v=JRcgoyE_Tsc&ab_channel=DataFunX" target="_blank">Mount AWS S3 to Databricks</a>

In [0]:
aws_keys = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/awskeys/rootkey.csv")

In [0]:
ACCESS_KEY = aws_keys.select("Access key ID").take(1)[0]['Access key ID']
SECRET_KEY = aws_keys.select("Secret access key").take(1)[0]['Secret access key']

In [0]:
import urllib

ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
AWS_S3_BUCKET = 'olympic-data-280623'
MOUNT_NAME = '/mnt/mount_s3'

SOURCE_URL = "s3a://%s:%s@%s" %(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)

In [0]:
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

Out[14]: True

In [0]:
%fs ls '/mnt/mount_s3'

path,name,size,modificationTime
dbfs:/mnt/mount_s3/athlete_events.csv,athlete_events.csv,23724678,1687925743000
dbfs:/mnt/mount_s3/athletes.csv,athletes.csv,6071413,1687925771000


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

# schema for the athletes table
athletes_schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("name", StringType(), nullable=False),
    StructField("gender", StringType(), nullable=True),
    StructField("height", IntegerType(), nullable=True),
    StructField("weight", FloatType(), nullable=True),
    StructField("team", StringType(), nullable=False)
])
# creating atheletes dataframe 
athletes_df = spark.read.format("csv").option("header", "true").schema(athletes_schema).load("dbfs:/mnt/mount_s3/athletes.csv")
# athletes_df = athletes_df.na.drop()

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema for the athlete_events table
athlete_events_schema = StructType([
    StructField("athlete_id", IntegerType(), nullable=False),
    StructField("games", StringType(), nullable=False),
    StructField("year", IntegerType(), nullable=True),
    StructField("season", StringType(), nullable=True),
    StructField("city", StringType(), nullable=True),
    StructField("sport", StringType(), nullable=True),
    StructField("event", StringType(), nullable=True),
    StructField("medal", StringType(), nullable=True)
])

# creating athlete_events dataframe 

athlete_events_df = spark.read.format("csv").option("header", "true").schema(athlete_events_schema).load("dbfs:/mnt/mount_s3/athlete_events.csv")

athlete_events_df = athlete_events_df.na.drop()

In [0]:
# Use this command in Snowflake Worksheet to Grant access

# -- GRANT SELECT ON TABLE OLYMPIC_DB.PUBLIC.ATHLETES TO ROLE ACCOUNTADMIN;

# -- GRANT SELECT ON TABLE OLYMPIC_DB.PUBLIC.ATHLETE_EVENTS TO ROLE ACCOUNTADMIN;

In [0]:
# Snowflake Connection credentials
sfOptions = {
  "sfUrl": "zf64891.central-india.azure.snowflakecomputing.com",
  "sfUser": "Snowflake-UserName",
  "sfPassword": "Snowflake-Password",
  "sfDatabase": "OLYMPIC_DB",
  "sfSchema": "PUBLIC",
  "sfWarehouse": "COMPUTE_WH"
}

In [0]:

# Specify the Snowflake table name
table_name = "athletes"

# Write the DataFrame to Snowflake
athletes_df.write.format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", table_name) \
    .mode("overwrite") \
    .options(header=True) \
    .save()


In [0]:

# Specify the Snowflake table name
table_name = "athlete_events"

# Write the DataFrame to Snowflake
athlete_events_df.write.format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", table_name) \
    .mode("overwrite") \
    .options(header=True) \
    .save()

In [0]:

# Specify the Snowflake table name
table_name = "athlete_events"

# Read the data written by the previous cell back.
df = spark.read \
  .format("snowflake") \
  .options(**sfOptions) \
  .option("dbtable", table_name) \
  .load()
 
display(df)

ATHLETE_ID,GAMES,YEAR,SEASON,CITY,SPORT,EVENT,MEDAL
120259,2016 Summer,2016,Summer,Rio de Janeiro,Table Tennis,Table Tennis Women's Team,
120260,1956 Summer,1956,Summer,Melbourne,Boxing,Boxing Men's Welterweight,Silver
120261,1968 Summer,1968,Summer,Mexico City,Athletics,Athletics Men's Decathlon,
120262,1992 Summer,1992,Summer,Barcelona,Athletics,Athletics Women's Long Jump,
120262,2000 Summer,2000,Summer,Sydney,Athletics,Athletics Women's Long Jump,
120263,1964 Winter,1964,Winter,Innsbruck,Luge,Luge Men's Singles,
120264,1904 Summer,1904,Summer,St. Louis,Wrestling,"Wrestling Men's Light-Flyweight, Freestyle",Bronze
120265,1948 Summer,1948,Summer,London,Hockey,Hockey Men's Hockey,Bronze
120265,1952 Summer,1952,Summer,Helsinki,Hockey,Hockey Men's Hockey,Silver
120266,1968 Summer,1968,Summer,Mexico City,Volleyball,Volleyball Men's Volleyball,


In [0]:
# Specify the Snowflake table name
table_name = "athletes"

# Read the data written by the previous cell back.
df = spark.read \
  .format("snowflake") \
  .options(**sfOptions) \
  .option("dbtable", table_name) \
  .load()
 
display(df)

ID,NAME,GENDER,HEIGHT,WEIGHT,TEAM
94427,Prisca Philip (-Polzine),F,,,Barbados
94428,Andrea Philipp (-Ziercke),F,165.0,60.0,Germany
94429,Hugo Philipp,M,,,Austria
94430,Lutz Philipp,M,178.0,69.0,Germany
94431,Luzius Philipp,M,170.0,77.0,Switzerland
94432,Peter Philipp,M,183.0,73.0,Switzerland
94433,Rainer Philipp,M,182.0,88.0,West Germany
94434,Ludo Ren Mathilde Philippaerts,M,180.0,85.0,Belgium
94435,Nicola Philippaerts,M,186.0,76.0,Belgium
94436,Bernard Philippe,M,179.0,73.0,Luxembourg
