In [54]:
from pathlib import Path
from pyspark import SparkConf
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, explode
from pyspark.sql.types import TimestampType


In [55]:


# Save data from S3 bucket 
s3_path = f"s3a://dataminded-academy-capstone-resources/raw/open_aq/"
config = {"spark.jars.packages": "org.apache.hadoop:hadoop-aws:3.2.0,net.snowflake:spark-snowflake_2.12:2.9.0-spark_3.1,net.snowflake:snowflake-jdbc:3.13.3", 
"fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
}
conf = SparkConf().setAll(config.items())
spark = SparkSession.builder.config(conf = conf).getOrCreate()
df = spark.read.json(s3_path)

                                                                                

In [56]:
df.printSchema()

root
 |-- city: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- country: string (nullable = true)
 |-- date: struct (nullable = true)
 |    |-- local: string (nullable = true)
 |    |-- utc: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- isAnalysis: boolean (nullable = true)
 |-- isMobile: boolean (nullable = true)
 |-- location: string (nullable = true)
 |-- locationId: long (nullable = true)
 |-- parameter: string (nullable = true)
 |-- sensorType: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- value: double (nullable = true)



In [57]:
## More straight forward way from Adriana: withColumn('longitude', col('coordinates.longitude')), then drop column

## From StackOverflow
def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

flat_df = flatten_df(df)
flat_df.show()

+----+-------+---------+----------+--------+--------------------+----------+---------+---------------+-----+-----+--------------------+---------------------+--------------------+--------------------+
|city|country|   entity|isAnalysis|isMobile|            location|locationId|parameter|     sensorType| unit|value|coordinates_latitude|coordinates_longitude|          date_local|            date_utc|
+----+-------+---------+----------+--------+--------------------+----------+---------+---------------+-----+-----+--------------------+---------------------+--------------------+--------------------+
|null|     BE|community|     false|   false|      Wilsele-Herent|     66110|      pm1|low-cost sensor|µg/m³|  1.7|              50.904|               4.6959|2021-02-02T22:59:...|2021-02-02T23:59:...|
|null|     BE|community|     false|   false|      Wilsele-Herent|     66110|     pm25|low-cost sensor|µg/m³|  4.1|              50.904|               4.6959|2021-02-02T22:59:...|2021-02-02T23:59:...|


                                                                                

In [58]:
flat_df = flat_df.withColumn("date_local" , col("date_local").cast(TimestampType()))\
       .withColumn("date_utc" , col("date_utc").cast(TimestampType()))

flat_df.show()

+----+-------+---------+----------+--------+--------------------+----------+---------+---------------+-----+-----+--------------------+---------------------+-------------------+-------------------+
|city|country|   entity|isAnalysis|isMobile|            location|locationId|parameter|     sensorType| unit|value|coordinates_latitude|coordinates_longitude|         date_local|           date_utc|
+----+-------+---------+----------+--------+--------------------+----------+---------+---------------+-----+-----+--------------------+---------------------+-------------------+-------------------+
|null|     BE|community|     false|   false|      Wilsele-Herent|     66110|      pm1|low-cost sensor|µg/m³|  1.7|              50.904|               4.6959|2021-02-02 23:59:53|2021-02-02 23:59:53|
|null|     BE|community|     false|   false|      Wilsele-Herent|     66110|     pm25|low-cost sensor|µg/m³|  4.1|              50.904|               4.6959|2021-02-02 23:59:53|2021-02-02 23:59:53|
|null|    

## Get secrets from AWS Secrets Manager
```aws secretsmanager get-secret-value --secret-id snowflake/capstone/login```

In [59]:
import botocore 
import botocore.session 
import json
from aws_secretsmanager_caching import SecretCache, SecretCacheConfig 

# Get credentials from AWS Secrets Manager
def get_credentials():
    client = botocore.session.get_session().create_client('secretsmanager')
    cache_config = SecretCacheConfig()
    cache = SecretCache( config = cache_config, client = client)

    secret_dict = json.loads(cache.get_secret_string('snowflake/capstone/login'))
    return secret_dict

## Get secrete log-in information
secrets_dict = get_credentials()
print(type(secrets_dict))

<class 'dict'>


In [63]:


## Load to Snowflake
sfOptions = { 
"sfURL": secrets_dict["URL"], 
"sfUser" : secrets_dict["USER_NAME"], 
"sfPassword" : secrets_dict["PASSWORD"], 
"sfDatabase" : secrets_dict["DATABASE"], 
"sfSchema" : "VANESSA", 
"sfWarehouse" : secrets_dict["WAREHOUSE"], 
"sfRole" :secrets_dict["ROLE"], 
}

flat_df.write.format("net.snowflake.spark.snowflake").options(**sfOptions).option("dbtable", "clean_data").mode("Append").save()

Py4JJavaError: An error occurred while calling o888.save.
: java.lang.ClassNotFoundException: Failed to find data source: net.snowflake.spark.snowflake. Please find packages at http://spark.apache.org/third-party-projects.html
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:692)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:746)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:993)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:311)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: net.snowflake.spark.snowflake.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:666)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:666)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
	... 15 more


## Log-in to AWS Console
IAMAccount: datamindedacademy
User: Vanessa
Password: On Slack