In [0]:
# start by importing library
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import types
spark = SparkSession.builder.appName('Basics').getOrCreate()
import requests
import json
from datetime import datetime

In [0]:
# gather all source root urls in a JSON  and validate them visually
source_url= requests.get('https://swapi.dev/api')
source_url.status_code
source_url_json=source_url.json()
source_url_json

Out[20]: {'people': 'https://swapi.dev/api/people/',
 'planets': 'https://swapi.dev/api/planets/',
 'films': 'https://swapi.dev/api/films/',
 'species': 'https://swapi.dev/api/species/',
 'vehicles': 'https://swapi.dev/api/vehicles/',
 'starships': 'https://swapi.dev/api/starships/'}

In [0]:
# Only during development, when clean up needed, for ad hock deletion of raw layer tables
#spark.sql("use starwar_raw")
#spark.sql("drop table films")
#spark.sql("drop table people")
#spark.sql("drop table planets")
#spark.sql("drop table species")
#spark.sql("drop table starships")
#spark.sql("drop table vehicles")
#spark.sql("drop database starwar_raw")

In [0]:

spark.sql("show databases").show()

+---------------+
|   databaseName|
+---------------+
|        curated|
|        default|
|        starwar|
|starwar_curated|
|    starwar_raw|
+---------------+



In [0]:
%fs ls /FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/ContainsNull.csv,ContainsNull.csv,61,1670433563000
dbfs:/FileStore/tables/appl_stock.csv,appl_stock.csv,143130,1670425610000
dbfs:/FileStore/tables/films/,films/,0,1671683337105
dbfs:/FileStore/tables/people/,people/,0,1671683337105
dbfs:/FileStore/tables/people-2.json,people-2.json,0,1670360373000
dbfs:/FileStore/tables/people-3.json,people-3.json,73,1670370846000
dbfs:/FileStore/tables/people.json,people.json,0,1670358962000
dbfs:/FileStore/tables/planets/,planets/,0,1671683337105
dbfs:/FileStore/tables/sales_info.csv,sales_info.csv,196,1670429071000
dbfs:/FileStore/tables/species/,species/,0,1671683337105


In [0]:
# These SQLs was used for the first time to create database. We will need to execute them for the first time while running in PROD for the first time
#%sql create database starwar_raw;
#%sql create database starwar_curated;


In [0]:
#################################################################################################################################################
### Ingestion of API data from source into raw layer
### Assumption - for testing purpose, range of 0 to 100 for each api is enough. 
###              for PROD runs, we will have bigger cluster so we can identify actual
###              range by running one time loop to get the range. Or we can get the range from SMEs and business owner.
#################################################################################################################################################

### Framework configurations fields
load_type = 'full'
raw_dbname = 'starwar_raw'
curated_dbname = 'starwar_curated'
main_directory = '/FileStore/tables/'
start_limit = 1
end_limit = 100

### Full clean up raw layer, only in developement region
#DROP database IF EXISTS curated CASCADE

### Creation of database and its usage

#spark.sql(f"create database {raw_dbname}")
spark.sql(f"use {raw_dbname}")

import json
import requests

### business logic

def ingest_swapi_data(url, start_limit, end_limit):
    """ Call the SWAPI and create a list of data. """
    data_set = []
    for number in range(start_limit, end_limit):
        response = requests.get(f"{url}{number}/")

        if response.status_code != 200:
            continue
        else:
            data = response.json()

            data_set.append(data)
    return data_set

# capture current timestamp

def get_timestamp():   
    now = datetime.now()
    current_time = now.strftime("%Y-%m-%d %H:%M:%S")
    print("Current Time is :", current_time)
    return current_time

### Logic to maintain date for next run of curated zone starts here.
### This will store the current timestamp for the next delta / incremental load.
### I have created seperate cells for delta load in below code

run_time = get_timestamp()
run_time_lst =spark.createDataFrame([[run_time]])
#run_time_lst.write.mode('overwrite').saveAsTable('last_load_time')
    
### Loop through the resources and url dictionary

for resource, url in source_url_json.items():
    print(resource)
#   response variable contains list of all jsons of sopecific resource
    response = ingest_swapi_data(url, start_limit, end_limit)
# converting response data into spark dataframe 'df'
    df=spark.createDataFrame(response)
#    df.printSchema()  for testing and validation only

#    filepath = main_directory + resource +'/' + run_time + '/' --needed only if we need to maintain history in json files. This could help in identifying delete scenarios whete we might need to compare current data with yesterday's or laste runs data

#    overwtite mode or drop and recreate is used because we will need to load full data every time in raw zone. This is because we do not have any other 
#    mechanism than using fields 'edited' and 'created' for delta detection.
#    df.write.format('delta').mode('overwrite').saveAsTable(resource)
    spark.sql(f"drop table {resource} ")
    df.write.format('delta').saveAsTable(resource)



Current Time is : 2022-12-22 04:28:57
people
planets
films
species
vehicles
starships


In [0]:
#Test - Validate database, table and respective counts
spark.sql("show tables").show()
spark.sql("select 'people', count(*) from people").show()
spark.sql("select 'films', count(*) from films").show()
spark.sql("select 'planets', count(*) from planets").show()
spark.sql("select 'species', count(*) from species").show()
spark.sql("select 'starships', count(*) from starships").show()
spark.sql("select 'vehicles', count(*) from vehicles").show()

+-----------+----------------+-----------+
|   database|       tableName|isTemporary|
+-----------+----------------+-----------+
|starwar_raw|      characters|      false|
|starwar_raw|characters_delta|      false|
|starwar_raw|           films|      false|
|starwar_raw|            keys|      false|
|starwar_raw|  last_load_time|      false|
|starwar_raw|          people|      false|
|starwar_raw|         planets|      false|
|starwar_raw|    related_keys|      false|
|starwar_raw|         species|      false|
|starwar_raw|       star_keys|      false|
|starwar_raw|       starships|      false|
|starwar_raw|        vehicles|      false|
+-----------+----------------+-----------+

+------+--------+
|people|count(1)|
+------+--------+
|people|      82|
+------+--------+

+-----+--------+
|films|count(1)|
+-----+--------+
|films|       6|
+-----+--------+

+-------+--------+
|planets|count(1)|
+-------+--------+
|planets|      60|
+-------+--------+

+-------+--------+
|species|count(1)|


In [0]:
%sql select * from people

birth_year,created,edited,eye_color,films,gender,hair_color,height,homeworld,mass,name,skin_color,species,starships,url,vehicles
19BBY,2014-12-09T13:50:51.644000Z,2014-12-20T21:17:56.891000Z,blue,"List(https://swapi.dev/api/films/1/, https://swapi.dev/api/films/2/, https://swapi.dev/api/films/3/, https://swapi.dev/api/films/6/)",male,blond,172,https://swapi.dev/api/planets/1/,77,Luke Skywalker,fair,List(),"List(https://swapi.dev/api/starships/12/, https://swapi.dev/api/starships/22/)",https://swapi.dev/api/people/1/,"List(https://swapi.dev/api/vehicles/14/, https://swapi.dev/api/vehicles/30/)"
112BBY,2014-12-10T15:10:51.357000Z,2014-12-20T21:17:50.309000Z,yellow,"List(https://swapi.dev/api/films/1/, https://swapi.dev/api/films/2/, https://swapi.dev/api/films/3/, https://swapi.dev/api/films/4/, https://swapi.dev/api/films/5/, https://swapi.dev/api/films/6/)",,,167,https://swapi.dev/api/planets/1/,75,C-3PO,gold,List(https://swapi.dev/api/species/2/),List(),https://swapi.dev/api/people/2/,List()
33BBY,2014-12-10T15:11:50.376000Z,2014-12-20T21:17:50.311000Z,red,"List(https://swapi.dev/api/films/1/, https://swapi.dev/api/films/2/, https://swapi.dev/api/films/3/, https://swapi.dev/api/films/4/, https://swapi.dev/api/films/5/, https://swapi.dev/api/films/6/)",,,96,https://swapi.dev/api/planets/8/,32,R2-D2,"white, blue",List(https://swapi.dev/api/species/2/),List(),https://swapi.dev/api/people/3/,List()
41.9BBY,2014-12-10T15:18:20.704000Z,2014-12-20T21:17:50.313000Z,yellow,"List(https://swapi.dev/api/films/1/, https://swapi.dev/api/films/2/, https://swapi.dev/api/films/3/, https://swapi.dev/api/films/6/)",male,none,202,https://swapi.dev/api/planets/1/,136,Darth Vader,white,List(),List(https://swapi.dev/api/starships/13/),https://swapi.dev/api/people/4/,List()
19BBY,2014-12-10T15:20:09.791000Z,2014-12-20T21:17:50.315000Z,brown,"List(https://swapi.dev/api/films/1/, https://swapi.dev/api/films/2/, https://swapi.dev/api/films/3/, https://swapi.dev/api/films/6/)",female,brown,150,https://swapi.dev/api/planets/2/,49,Leia Organa,light,List(),List(),https://swapi.dev/api/people/5/,List(https://swapi.dev/api/vehicles/30/)
52BBY,2014-12-10T15:52:14.024000Z,2014-12-20T21:17:50.317000Z,blue,"List(https://swapi.dev/api/films/1/, https://swapi.dev/api/films/5/, https://swapi.dev/api/films/6/)",male,"brown, grey",178,https://swapi.dev/api/planets/1/,120,Owen Lars,light,List(),List(),https://swapi.dev/api/people/6/,List()
47BBY,2014-12-10T15:53:41.121000Z,2014-12-20T21:17:50.319000Z,blue,"List(https://swapi.dev/api/films/1/, https://swapi.dev/api/films/5/, https://swapi.dev/api/films/6/)",female,brown,165,https://swapi.dev/api/planets/1/,75,Beru Whitesun lars,light,List(),List(),https://swapi.dev/api/people/7/,List()
unknown,2014-12-10T15:57:50.959000Z,2014-12-20T21:17:50.321000Z,red,List(https://swapi.dev/api/films/1/),,,97,https://swapi.dev/api/planets/1/,32,R5-D4,"white, red",List(https://swapi.dev/api/species/2/),List(),https://swapi.dev/api/people/8/,List()
24BBY,2014-12-10T15:59:50.509000Z,2014-12-20T21:17:50.323000Z,brown,List(https://swapi.dev/api/films/1/),male,black,183,https://swapi.dev/api/planets/1/,84,Biggs Darklighter,light,List(),List(https://swapi.dev/api/starships/12/),https://swapi.dev/api/people/9/,List()
57BBY,2014-12-10T16:16:29.192000Z,2014-12-20T21:17:50.325000Z,blue-gray,"List(https://swapi.dev/api/films/1/, https://swapi.dev/api/films/2/, https://swapi.dev/api/films/3/, https://swapi.dev/api/films/4/, https://swapi.dev/api/films/5/, https://swapi.dev/api/films/6/)",male,"auburn, white",182,https://swapi.dev/api/planets/20/,77,Obi-Wan Kenobi,fair,List(),"List(https://swapi.dev/api/starships/48/, https://swapi.dev/api/starships/59/, https://swapi.dev/api/starships/64/, https://swapi.dev/api/starships/65/, https://swapi.dev/api/starships/74/)",https://swapi.dev/api/people/10/,List(https://swapi.dev/api/vehicles/38/)


In [0]:
%sql describe formatted starwar_raw.people

col_name,data_type,comment
birth_year,string,
created,string,
edited,string,
eye_color,string,
films,array,
gender,string,
hair_color,string,
height,string,
homeworld,string,
mass,string,


In [0]:
#### Read the raw zone tables into respective dataframes

people=spark.sql("select * from people")
films=spark.sql("select * from films")
planets=spark.sql("select * from planets")
species=spark.sql("select * from species")
starships=spark.sql("select * from starships")


In [0]:
### Create a keys table to store key relationships between people, films, species and starships resources becuase these are one to many relationships with people
### Note that home planet has 1x1 relationship with people because its a string

keys_data = []
for i in people.collect():
    people_key = i["url"]
    
    for films in i["films"]:
        row = []
        row = [people_key,films]
        keys_data.append(row)
    for species in i["species"]:
        row = []
        row = [people_key,species]
        keys_data.append(row)
    for starships in i["starships"]:
        row = []
        row = [people_key,starships]
        keys_data.append(row)
keys_schema = ['people_key','related_key']

related_keys = spark.createDataFrame(keys_data,keys_schema).distinct()
spark.sql("drop table related_keys")
related_keys.write.format('delta').saveAsTable('related_keys')


In [0]:
%sql select * from starwar_raw.related_keys order by people_key;

people_key,related_key
https://swapi.dev/api/people/1/,https://swapi.dev/api/films/6/
https://swapi.dev/api/people/1/,https://swapi.dev/api/starships/12/
https://swapi.dev/api/people/1/,https://swapi.dev/api/films/1/
https://swapi.dev/api/people/1/,https://swapi.dev/api/films/3/
https://swapi.dev/api/people/1/,https://swapi.dev/api/films/2/
https://swapi.dev/api/people/1/,https://swapi.dev/api/starships/22/
https://swapi.dev/api/people/10/,https://swapi.dev/api/films/4/
https://swapi.dev/api/people/10/,https://swapi.dev/api/starships/64/
https://swapi.dev/api/people/10/,https://swapi.dev/api/starships/48/
https://swapi.dev/api/people/10/,https://swapi.dev/api/films/3/


In [0]:
###########################################################################################################
## FULL LOAD
## Assumption1 - only five fields are needed, and there corresponding source fields mapping is listed below 
##   1 Character Name  - name attribute from resource people
##   2 Film            - title attribute from resource film
##   3 Starship        - name attribute from resource starship 
##   4 Home Planet     - name attribute from resource planet
##   5 Language        - language attribute from resource species
## Assumption2 - Field Ind is for type of record. It will be
##               F - for full load
##               I - for Inserts means created records during the delta process
##               U - for Updates means updated recors during the delta process
## Assumption3 - Field URL from 'people' resource will be kept in final table for validation purpose.
##               If this is not needed, we will exclude it in next version
############################################################################################################
spark.sql("use starwar_raw")

characters = spark.sql("select distinct p.url, p.name as character_name, f.title as film,s.name as starship, ps.name as home_planet, sc.name as species, sc.language, 'F' as ind  from people p join related_keys k on trim(p.url) = trim(k.people_key) left join films f on trim(k.related_key) = f.url left join starships s on trim(k.related_key) = s.url left join planets ps on trim(p.homeworld) = trim(ps.url) left join species sc on trim(k.related_key) = trim(sc.url) order by p.url")

spark.sql("use starwar_curated")
spark.sql("drop table characters")
characters.write.format('delta').saveAsTable('characters')


In [0]:
%sql select * from characters order by url;

url,character_name,film,starship,home_planet,species,language,ind
https://swapi.dev/api/people/1/,Luke Skywalker,,Imperial shuttle,Tatooine,,,F
https://swapi.dev/api/people/1/,Luke Skywalker,Return of the Jedi,,Tatooine,,,F
https://swapi.dev/api/people/1/,Luke Skywalker,Revenge of the Sith,,Tatooine,,,F
https://swapi.dev/api/people/1/,Luke Skywalker,The Empire Strikes Back,,Tatooine,,,F
https://swapi.dev/api/people/1/,Luke Skywalker,,X-wing,Tatooine,,,F
https://swapi.dev/api/people/1/,Luke Skywalker,A New Hope,,Tatooine,,,F
https://swapi.dev/api/people/10/,Obi-Wan Kenobi,Revenge of the Sith,,Stewjon,,,F
https://swapi.dev/api/people/10/,Obi-Wan Kenobi,,Naboo star skiff,Stewjon,,,F
https://swapi.dev/api/people/10/,Obi-Wan Kenobi,The Empire Strikes Back,,Stewjon,,,F
https://swapi.dev/api/people/10/,Obi-Wan Kenobi,,Belbullab-22 starfighter,Stewjon,,,F


In [0]:
### Bonus quetions 1 ########################################################################################################
### # of other members of their species on the same ship
###  Assumption - 'their' means character's. So we need to get number of species of a characters on the same ship
###               We will need to join several tables to get this
###  Note in below query, the where clause filters non null values. It needs bigger data to get results. Current range is only 1 to 100.
###  Please see solution in the next cell
#############################################################################################################################



In [0]:
%sql select count(distinct character_name) as number_of_member,character_name, species, starship from starwar_curated.characters where species is not null and starship is not null group by character_name,species, starship 

number_of_member,character_name,species,starship
1,Yarael Poof,,
1,Poggle the Lesser,Geonosian,
1,Adi Gallia,,
1,Arvel Crynyd,,
1,Gasgano,Xexto,
1,Adi Gallia,Tholothian,
1,R4-P17,,
1,Sebulba,Dug,
1,Leia Organa,,
1,Tion Medon,Pau'an,


In [0]:
### Bonus quetions 2
### The characters rank by number of films (most to least), number of starships (most to least)
### Please see solution in the next few cells



In [0]:
%sql with films_numbers as ( select character_name, count(distinct film) as films_count from starwar_curated.characters group by character_name), starships_numbers as (select character_name, count(distinct starship) as starships_Count from starwar_curated.characters group by character_name) select f.character_name,f.films_count,s.starships_Count from films_numbers f join starships_numbers s on f.character_name=s.character_name order by films_count desc, starships_count desc

character_name,films_count,starships_Count
Obi-Wan Kenobi,6,5
R2-D2,6,0
C-3PO,6,0
Palpatine,5,0
Yoda,5,0
Chewbacca,4,2
Luke Skywalker,4,2
Darth Vader,4,1
Leia Organa,4,0
Padmé Amidala,3,3


In [0]:
### reset/changed run_time just for testing purpose for delta scenarios
run_time = '2010-12-09T13:50:51.644000Z'

In [0]:
f"select * from starwar_raw.people where created >= '{run_time}' or edited >= '{run_time}'"

Out[48]: "select * from starwar_raw.people where created >= '2010-12-09T13:50:51.644000Z' or edited >= '2010-12-09T13:50:51.644000Z'"

In [0]:
### Validated that at least some data is coming as delta. For testing purpose, delta date has been retrogressed back to '2010-06-20 21:36:09' to ensure this
spark.sql(f"select * from starwar_raw.people where created >= '{run_time}' or edited >= '{run_time}'").show()

+----------+--------------------+--------------------+---------+--------------------+-------------+-------------+------+--------------------+-------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+
|birth_year|             created|              edited|eye_color|               films|       gender|   hair_color|height|           homeworld|   mass|                name|      skin_color|             species|           starships|                 url|            vehicles|
+----------+--------------------+--------------------+---------+--------------------+-------------+-------------+------+--------------------+-------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+
|     19BBY|2014-12-09T13:50:...|2014-12-20T21:17:...|     blue|[https://swapi.de...|         male|        blond|   172|https://swapi.dev...|     77|      Luke Skywalker|            fa

In [0]:
run_time

Out[50]: '2010-12-09T13:50:51.644000Z'

In [0]:
###########################################################################################################
### DELTA LOAD
### for delta detection, we will use fields 'created' and 'edited' from each tables.
### Assumption is edited = U means update and created = I means insertion
############################################################################################################
### Framework configurations fields
load_type = 'delta'
raw_dbname = 'starwar_raw'
curated_dbname = 'starwar_curated'

# if load_type = 'delta'
spark.sql("use starwar_raw")

characters_delta = spark.sql(f"select p.url, p.name as character_name, f.title as film,s.name as starship, ps.name as home_planet, sc.language, 'I' as ind from people p join related_keys k on trim(p.url) = trim(k.people_key) left join films f on trim(k.related_key) = f.url left join starships s on trim(k.related_key) = s.url left join planets ps on trim(p.homeworld) = trim(ps.url) left join species sc on trim(k.related_key) = trim(sc.url) where p.created >= '{run_time}' or f.created >= '{run_time}' or s.created >= '{run_time}' or sc.created >= '{run_time}' or ps.created >= '{run_time}' union select p.url, p.name as character_name, f.title as film,s.name as starship, ps.name as home_planet, sc.language, 'U' as ind from people p join related_keys k on trim(p.url) = trim(k.people_key) left join films f on trim(k.related_key) = f.url left join starships s on trim(k.related_key) = s.url left join planets ps on trim(p.homeworld) = trim(ps.url) left join species sc on trim(k.related_key) = trim(sc.url) where p.edited >= '{run_time}' or  f.edited >= '{run_time}' or s.edited >= '{run_time}' or sc.edited >= '{run_time}' or ps.edited >= '{run_time}'")

characters_delta.show()
spark.sql("use starwar_curated")
#spark.sql("drop table characters_delta")
characters.write.format('delta').saveAsTable('characters_delta')


+--------------------+--------------+--------------------+----------------+-----------+--------+---+
|                 url|character_name|                film|        starship|home_planet|language|ind|
+--------------------+--------------+--------------------+----------------+-----------+--------+---+
|https://swapi.dev...|         R2-D2| Revenge of the Sith|            null|      Naboo|    null|  I|
|https://swapi.dev...|         C-3PO|                null|            null|   Tatooine|     n/a|  I|
|https://swapi.dev...|         R2-D2|The Empire Strike...|            null|      Naboo|    null|  I|
|https://swapi.dev...|Luke Skywalker|The Empire Strike...|            null|   Tatooine|    null|  I|
|https://swapi.dev...|         C-3PO|The Empire Strike...|            null|   Tatooine|    null|  I|
|https://swapi.dev...|Luke Skywalker|                null|Imperial shuttle|   Tatooine|    null|  I|
|https://swapi.dev...|         R2-D2|          A New Hope|            null|      Naboo|    

In [0]:
### performing upsert using merge statement. 
### This error is occuring because we manipulated the delta date. If the date is yesterday's date and if there is a true delta, this merge command will work.
spark.sql("merge into characters using characters_delta on characters.url = characters_delta.url when matched and characters_delta.ind = 'U' then update set * when not matched then insert *")

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-3735392585448498>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      1[0m [0;31m### performing upsert using merge statement[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mspark[0m[0;34m.[0m[0msql[0m[0;34m([0m[0;34m"merge into characters using characters_delta on characters.url = characters_delta.url when matched and characters_delta.ind = 'U' then update set * when not matched then insert *"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/session.py[0m in [0;36msql[0;34m(self, sqlQuery, **kwargs)[0m
[1;32m   1097[0m             [0msqlQuery[0m [0;34m=[0m [0mformatter[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0msqlQuery[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m   109