# Data from API with Python

In [None]:
import requests
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pandas as pd

In [None]:
# Create Spark session (a Spark Context is automatically created when the spark job runs)
spark = SparkSession(sc)

In [None]:
# Specify the API endpoint and parameters
url = "https://api.openweathermap.org/data/2.5/forecast"
params = {"q": "London,uk", "appid": "<api-key>"}

# Make the API request and parse the response as json
response = requests.get(url, params=params)
data = response.json()

##### Spark principles
At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.

In [None]:
# Create a RDD
rddjson = sc.parallelize([data])

# Create a DataFrame
df = spark.read.json(rddjson)

# View the DF
df.show()

We could create the dataframe from a sublevel of the json
        
        rddjson1 = sc.parallelize([data["list"]])
        df1 = spark.read.json(rddjson1)
        df1.show()

You can try this but for this practice we will not use this

## Write file from dataframe (options)

Option 3 is the one we will use in this practice. The first two, are for you to explore behavior. You can view the resulting files in the data folder for this session.

### Option 1: directly from spark dataframe

In [None]:
# Set datalake path
dlpath = 'abfs://<container-name>@<datalake-name>.dfs.core.windows.net'

# Write the dataframe to a json file
df.write.format("json").mode("overwrite").save(dlpath + "/raw/forecast.json")

### Option 2: Convert to pandas from spark and then to csv

As Spark works partitioning data, spark will create a directory and inside the directory will write many files with chunks of data each, and not sorted.
So, for easier way to work with little dataframes, we can convert the Spark dataframe to a Pandas dataframe.
Two other options are presented. There are subtle differences in the way each method converts and writes to file.

In [None]:
# Convert to pandas --- convert to csv
# Spark method 'toPandas' and then concatenated the Pandas method 'to_csv'
df.toPandas().to_csv(dlpath + '/raw/toPandas_forecast.csv', index_label = False, lineterminator = '\n', sep = ',', header=True)

### Option 3: Create pandas df from original data and then to csv

In [None]:
# Create a Pandas DataFrame from response data above in json format
data_csv = pd.DataFrame([data])

# Pandas to csv
data_csv.to_csv(dlpath + '/raw/pd_forecast.csv', index=False)
data_csv.sample()

# SQL Transformations

+ DDL statement for local sql server instance: create table and load data from csv

        %%sql -- magic command: change the language of the execution for this cell
        CREATE TABLE Weather_london_landing4_direct (
            cod varchar(200),
            message varchar(200),
            cnt varchar(50),
            list varchar(max),
            city varchar (500)
        ) 
        WITH (
        LOCATION = 'path/to/pd_forecast.csv',
            FILE_FORMAT = CSV,
            REJECT_TYPE = VALUE,
            REJECT_VALUE = 0,
            FIELDQUOTE = '"',
            FIRSTROW = 2, -- as 1st one is header
            FIELDTERMINATOR = ',',  --CSV field delimiter
            ROWTERMINATOR = '\n'   --Use to shift the control to next row
        );

+ Create a local final(cleaned) table in the sql pool

      %%sql
      CREATE TABLE Weather_london (
        dt int,
        dt_txt datetime,
        cloudiness int,
        precip_prob float,
        humidity float,
        "temp" float,
        temp_max float,
        temp_min float,
        visibility float,
        wind_speed float
      );


+ TRANSFORM 1: Change single quote inside list field to doble quotes

        %%sql
        UPDATE Weather_london_landing
        SET 'list' = REPLACE(list, char(39),'"');

+ TRANSFORM 2 : Extract from JSON, convert data types, insert into final table

        %%sql
        DECLARE @json NVARCHAR(MAX);
        SELECT @json = list FROM Weather_london_landing    -- Set list field as json object and insert into
        INSERT INTO Weather_london
        SELECT *
        FROM OPENJSON ( @json )  
        WITH (   
            dt  int                 '$.dt',  
            dt_txt  datetime        '$.dt_txt',   
            cloudiness int          '$.clouds.all',
            precip_prob float       '$.pop',
            temp float              '$.main.temp',
            temp_max float          '$.main.temp_max',
            temp_min float          '$.main.temp_min',
            humidity int            '$.main.humidity', 
            visibility float        '$.visibility',
            wind_speed float        '$.wind.speed'
        );