## Data sourcing

Source data from various source systems and ingest them using python code.

1. Parquet files
2. CSV files
3. APIs
4. RDBMS databases
5. HTML

In [None]:
# import modules
import certifi
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sqlite3
import urllib3
from urllib3 import request
import requests
from unicodedata import normalize

### Sourcing Parquet data

Please visit the url https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

In [None]:
# Read data from the Parquet file. We use pandas read_parquet method for ease and speed.
df_parquet = pd.read_parquet("yellow_tripdata_2022-01.parquet")
df_parquet.head()

### Sourcing CSV data 

Please visit the url https://data.cityofnewyork.us/resource/h9gi-nx95.csv?$limit=500


In [None]:
# Read data from the CSV file. We use pandas read_csv method for ease and speed.
df_csv = pd.read_csv("h9gi-nx95.csv")
df_csv.head()

### Sourcing data from APIs

Please make sure to install the certifi library using - pipenv install certifi

In [None]:
# get api data from url
url = 'https://data.cityofnewyork.us/resource/h9gi-nx95.json?$limit=500'

# Check if API is available to retrive the data
apt_status = http.request('GET', url).status
print(apt_status)
if apt_status == 200:
    # Sometimes we get certificate error . We shoul never silence this error as this may cause a securirty threat.
    # Create a Pool manager that can be used to read the API response 
    http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where())
    data = json.loads(http.request('GET', url).data.decode('utf-8'))
    df_api = pd.json_normalize(data)
else:
    df_api = pd.Dataframe()
df_api.head(10)

### PISA API Testing

In [1]:
type = 'prod'

In [2]:
import pyspark,psycopg2
from pyspark.sql import SparkSession,SQLContext

spark = SparkSession.builder \
  .master("local[1]") \
  .appName("chapter6_schemas") \
  .config("spark.executor.memory", '3g') \
  .config("spark.executor.cores", '1') \
  .config("spark.cores.max", '1') \
  .config("spark.jars.packages","org.postgresql:postgresql:42.0.0") \
  .getOrCreate()

:: loading settings :: url = jar:file:/home/rstudio/workspaces/pisa2025-api-etl/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/rstudio/.ivy2/cache
The jars for the packages stored in: /home/rstudio/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e97aa8d6-0b2d-4a64-b82c-da7638604694;1.0
	confs: [default]
	found org.postgresql#postgresql;42.0.0 in central
downloading https://repo1.maven.org/maven2/org/postgresql/postgresql/42.0.0/postgresql-42.0.0.jar ...
	[SUCCESSFUL ] org.postgresql#postgresql;42.0.0!postgresql.jar(bundle) (1090ms)
:: resolution report :: resolve 4032ms :: artifacts dl 1093ms
	:: modules in use:
	org.postgresql#postgresql;42.0.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   1   |   1   |   0   ||

24/02/21 04:04:45 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
import sys
import requests
import json
import pandas as pd
import logging
from datetime import date

logging.captureWarnings(True)

##
##    function to obtain a new OAuth 2.0 token from the authentication server
##
def get_new_token(type: str):

    if(type=="staging"):
        auth_server_url = "https://auth-pisa-qa.staging.gcp-eu.taocloud.org/v1/oauth2/tokens"
        client_id = 'Q2caBAhwvNF8NgmrEPKrUw'
        client_secret = 'wHtKRe6eA7BWRBNIISiTQk3uM210OJdQ'
    elif(type=='prod'):
        auth_server_url = "https://auth-pisa-prod.prod.gcp-eu.taocloud.org/v1/oauth2/tokens"
        client_id = 'Amb9_PvLT4I2ui8flocJsA'
        client_secret = 'a8OIzcsHn0z8Z7mtYG9zfjXZotrIJQS1lNuEKEVSFHs'

    token_req_payload = {'grant_type': 'client_credentials'}

    token_response = requests.post(auth_server_url,
    data=token_req_payload, verify=False, allow_redirects=False,
    auth=(client_id, client_secret))
                
    if token_response.status_code !=200:
        print("Failed to obtain token from the OAuth 2.0 server", file=sys.stderr)
        sys.exit(1)

    print("Successfuly obtained a new token")
    tokens = json.loads(token_response.text)
    return tokens['access_token']

## 
## 	obtain a token before calling the API for the first time
##
token = get_new_token(type)


Successfuly obtained a new token


In [3]:
# API request function

def req_func(req_type,url,token,data=None):

##
##   call the API with the token
##
    
    headers = {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer ' + token
    }

    req_req = str.upper(req_type)

    response = requests.request(req_req, url, headers=headers, data=data)

    if	(response.status_code != 200):
        token = get_new_token(type)
        globals()['token'] = token
        headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + token
        }
        response = requests.request(req_req, url, headers=headers, data=data)


    return response


In [4]:
def api_url(type: str, endpoint: str):
  if(type=='staging'):
    url = 'https://dynamic-query-api-pisa-qa.staging.gcp-eu.taocloud.org/api/v1/' + endpoint
  elif(type=='prod'):
    url = 'https://dynamic-query-api-pisa-prod.prod.gcp-eu.taocloud.org/api/v1/' + endpoint

  return url

In [42]:
api_url_health = api_url(type=type,endpoint='health')
df_health = req_func(req_type = "get", url=api_url_health,token=token)
df_health.text

Successfuly obtained a new token


'{"status":"ok"}'

In [6]:
api_url_entity_list = api_url(type=type,endpoint='entity/list')
response = req_func(req_type = "get", url=api_url_entity_list,token=token)
df_entity_list = pd.json_normalize(json.loads(response.text))
df_entity_list

Unnamed: 0,name,description
0,portalSessions,Holds information on portal sessions
1,portalUserGroups,Holds information on portal user groups
2,portalGroups,Holds information on portal groups
3,portalUserSessions,Holds information on portal user sessions
4,portalUser,Holds information on portal user
5,portalBattery,Holds information on portal battery
6,testRunnerDeliveries,Holds information on test runner deliveries
7,datastoreDeliveries,Holds information on datastore deliveries
8,datastoreDeliveryResults,Holds information on datastore delivery results
9,datastoreItemResults,Holds information on datastore item results


In [7]:
def json_schema_get(entity,token):
    api_entity_get = api_url(type=type,endpoint=('entity/%s' % entity))
    df_entity_schema = req_func(req_type = "get",url=api_entity_get,token=token)
    return df_entity_schema

In [162]:
import pandas as pd

entities = df_entity_list['name']

with pd.ExcelWriter('all_entities_spec_' + type.upper() + '_' + '.xlsx') as writer:
    for ent in entities:    
        resp = json_schema_get(entity=ent,token=token)
        df = pd.json_normalize(resp.json())
        df.to_excel(writer,sheet_name = ent, index = False)
        worksheet = writer.sheets[ent]  # pull worksheet object
        for idx, col in enumerate(df):  # loop through all columns
            series = df[col]
            max_len = max((
                series.astype(str).map(len).max(),  # len of largest item
                len(str(series.name))  # len of column name/header
                )) + 1  # adding a little extra space
            worksheet.set_column(idx, idx, max_len)  # set column width

In [8]:
def api_json_extract(response,entity,options=None):

  json_raw = response.json()
  if(options['pandas'] is True):
    df_raw = pd.json_normalize(json_raw,record_path=['data'],max_level = 0)
    # if(entity == 'datastoreDeliveryResults'):
    #   df_raw = df_raw.rename(columns={'rawItems':'raw_data'})
  else: df_raw = json_raw

  # if(options is not None):
  #   if (entity == 'datastoreDeliveryResults'):
  #     if('json_var' in options):
  #       json_var = options['json_var']
  #       tmp = df_raw.iloc[0]['data'][0][json_var]
  #       df = (
  #         tmp
  #         .join(pd.json_normalize(pd.json_normalize(tmp['values'])[0]))
  #         .drop(columns=['values'])
  #       )
  #       df['varType'] = df['type'].str.split(pat = "#").str[1]
  #     elif(('item_resp' in options) & (options['item_resp'] is True)):
  #       tmp = (
  #           pd.json_normalize(df_raw.iloc[0]['data'][0])
  #           .filter(regex = '^(?!metadata)(?!ltiParameters)(?!rawLtiParameters)(?!outcomes)')
  #         )
      
  #       idvars = tmp.columns[~tmp.columns.str.startswith('items.item-')]
  #       pivotvars = tmp.columns[tmp.columns.str.startswith('items.item-')]
        
  #       out = (
  #         pd.melt(
  #             tmp,
  #             id_vars = idvars,
  #             var_name = 'key',
  #             value_name = 'q_val',
  #             value_vars = pivotvars
  #         )
  #         .assign(
  #           key=lambda df: df.key.replace('items.item-','',regex=True)
  #         )
  #         .assign(
  #           q_num = lambda df: df.key.str.split('.',n=1).str[0],
  #           q_lab = lambda df: df.key.str.split('.',n=1).str[1],
  #         )
  #         .drop(
  #           ['key'],
  #           axis = 1
  #         )
  #       )
  #   else:
  #     out = (
  #       pd
  #       .json_normalize(df_raw.iloc[0]['data'][0])
  #     )
  # else:
  #   out = json_raw

  return df_raw

In [9]:
def payload_ent(entity: str, pageNumber=None, pageSize=None, filters=None, fields=None,flatResponse=None, sort = None, searchAfter = None):

  payload_dict = {}

  if(pageSize is not None):
    if(isinstance(pageSize,int)):
      payload_dict['pageSize'] = pageSize

  if(filters is not None):
    if(isinstance(filters,list)):
      payload_dict['filters'] = filters

  response = {}
  if(flatResponse is not None):
    response['flatResponse'] = False
  if(fields is not None):
    response['fields'] = fields

  if(len(response) > 0):
    payload_dict['response'] = response

  if(sort is not None):
    if(len(sort) > 0):
      payload_dict['sort'] = sort
  
  if(pageNumber is not None):
    payload_dict['pageNumber'] = pageNumber

  if(searchAfter is not None):
    payload_dict['searchAfter'] = searchAfter

  return json.dumps(payload_dict)

In [57]:
filter_dict = {}
# filter_dict['login'] = [
#   'TSC7011','TSC7012','TSC7013','TSC7014','TSC7015','TSC7016','TSC7017','TSC7018','TSC7019','TSC7020','TSC7021','TSC7022','TSC7041','TSC7042','TSC7043','TSC7044','TSC7045','TSC7046','TSC7047','TSC7048','TSC7049','TSC7050','TSC7051','TSC7071','TSC7072','TSC7073','TSC7074','TSC7075','TSC7076','TSC7077','TSC7078','TSC7079','TSC7081','TSC7101','TSC7102','TSC7103','TSC7104','TSC7105','TSC7106','TSC7107','TSC7108','TSC7109','TSC7110','TSC7111','TSC7112','TSC7113',
# ]
filter_dict['deliveryId'] = [
  "e9fc52080732",
  "82c00e6ddbee",
  "641b1e05580d",
  "926ed4b705e4",
  "eecc20e16409"
]
# filter_dict['login'] = [
#   '18289999011','18289999001','18289999014','18289999017','18289999044','18289999081','18289999002','18289999020','18289999035','18289999029','18289999083','18289999003','18289999021','18289999085','18289999087','18289999091','18289999007','18289999015','18289999068','18289999008','18289999093','18289999094','18289999095','18289999009','18289999010','18289999026','18289999096','18289999097','18289999099','18289999032','18289999041',
# ]
filter_payload = [
  {"field": k, 'type':'terms', 'values': v} for k, v in filter_dict.items()
]
# deliveries = ["f8a87921b1e5"]
# deliveries = ["58e42b858a4b"]

In [58]:
if('login' in filter_dict):

  log = filter_dict['login']
  log_reverse = [x[::-1] for x in filter_dict['login']]
  pd.DataFrame(
    {
      "logins": log,
      "logins_reversed": log_reverse
    }
  ).to_excel('./data/ldw_logins.xlsx',index=False)

In [43]:
import datetime

epoch = datetime.datetime.utcfromtimestamp(0)
date_from = datetime.datetime(2024,2,4)
date_to = datetime.datetime(2024,2,23)
inc_date_to = True
inc_date_from = True

def unix_time_millis(dt):
    return int((dt - epoch).total_seconds() * 1000.0)

def unix_time_string(num: int):
  x = datetime.datetime.fromtimestamp(int(num)/1000).strftime("%Y-%m-%d %H:%M:%S")
  return x

def epoch_range(inc_date_to: bool, inclusive: str, date_from=None, date_to=None, periods = None):
    
  dates = pd.Series(pd.date_range(
    start = date_from,
    end = date_to,
    periods = periods,
    freq="12H",
    inclusive=inclusive
  )).apply(
    unix_time_millis
  )

  return dates

if(inc_date_to and inc_date_from):
  inclusive_term = 'both'
elif(inc_date_to and not inc_date_from):
  inclusive_term = 'right'
elif(inc_date_from and not inc_date_to):
  inclusive_term = 'left'
else:
  inclusive_term = 'neither'

dates_all = epoch_range(inc_date_to=False, inclusive=inclusive_term, date_from = date_from, date_to = date_to, periods = None)

In [12]:
ent_cols = {}

ent_cols['datastoreDeliveryResults']=[
  "deliveryId",
  "login",
  "deliveryExecutionId",
  "submissionTime",
  "testQtiId",
  "testQtiTitle",
  "testQtiLabel",
  "last_update_date",
  "sessionStartTime",
  "sessionEndTime",
  "responses",
  "outcomes",
  "isDeleted",
  "rawItems"
]
ent_cols['datastoreDeliveries'] = [
  "deliveryId",
  "cutScoreRatio",
  "id",
  "items",
  "last_update_date",
  "messageId",
  "metadata",
  "metadataRaw",
  "outcomeDeclarations",
  "publicationTime",
  "qtiId",
  "qtiPackage",
  "qtiTitle",
  "structure",
  "tenantId",
  "testQtiId",
  "testQtiLabel",
  "testQtiTitle",
]
ent_cols['datastoreUIEvents'] = [
  "deliveryExecutionId",
  "batteryId",
  "itemId",
  "deliveryId",
  "domEventType",
  "tenantId",
  "responseId",
  "timestamp",
  "last_update_date",
  "_id",
  "metadata",
  "metadataRaw"
]

In [44]:
import math

entities = [
  # 'datastoreDeliveryResults',
  'datastoreUIEvents',
  # 'datastoreDeliveries'
]

time_var = 'timestamp'

resp_df = False

ent_df = {}
log_df = {}

for ent in entities:
  url = api_url(type=type,endpoint=('search/%s' % ent))

  fields = ent_cols[ent]

  sort = [
    {
      "type": "ASC",
      "field": time_var
    }
  ]

  filters_cnt=filter_payload + [{k: v for k, v in (('field', time_var), ('type', 'range'),('fromValue',unix_time_millis(date_from)),('toValue',unix_time_millis(date_to)),('toValueIncluded',False)) if v is not None}]
  
  payload_cnt = payload_ent(entity=ent, filters=filters_cnt, sort=sort, pageSize = 0)
  resp_cnt = req_func(req_type='post',url=url,token=token, data=payload_cnt)

  datas = []
  last_id = []
  log = []
  dates = dates_all.to_list()
  for idx, x in enumerate(dates):

    filters=filters_cnt + [{
      "type": "range",
      "field": time_var,
      "toValue": x
    }]
    
    if(idx != 0 and len(last_id) > 0):
      searchAfter = last_id
      payload = payload_ent(entity=ent, sort=sort, filters=filters, flatResponse=False,fields=None,searchAfter=searchAfter,pageSize=5000)
    else:
      payload = payload_ent(entity=ent, sort=sort, filters=filters, flatResponse=False,fields=None,pageSize=5000)
    
    resp = req_func(req_type='post',url=url,token=token, data=payload)
    resp_last_id = resp.json()['lastId']
    
    if(len(resp_last_id)>0):
      last_id = resp_last_id
      log_resp = pd.DataFrame([last_id]).rename({0:'lastId_' + time_var,1:'lastId_id'},axis = 1)
      df = api_json_extract(resp, entity = ent, options = {'pandas': resp_df})
      log_resp['num'] = len(df['data'])
      log_resp[time_var] = int(x)
      datas.append(df)
      log.append(log_resp)
    else:
      last_id = last_id
  
  ent_df[ent] = datas.copy()
  if(len(log) > 0):
    log_df[ent] = pd.concat(log).reset_index(drop=True)

Successfuly obtained a new token


In [38]:
import copy
def json_concat(dat_list):
  json_obj = copy.deepcopy(dat_list[0])

  if(len(dat_list) > 1):
    for d in dat_list[1:]:
      json_obj['data'].extend(d['data'])

  json_obj['totalResults'] = len(json_obj['data'])

  return json_obj

def json_to_pd(json_raw):
  df_raw = pd.json_normalize(json_raw,record_path=['data'],max_level = 0)
  return df_raw

In [45]:
pd.set_option('display.width',200)
logs = log_df['datastoreUIEvents']
logs['index'] = logs[time_var].apply(lambda x: dates_all[dates_all==x].index[0])
logs['date_of_call']=logs[time_var].apply(lambda x: unix_time_string(x))
print(logs)
print('Total records in data file: ' + str(logs.num.sum()))
print('Total records according to API: ' + str(resp_cnt.json()['totalResults']))

    lastId_timestamp                             lastId_id   num      timestamp  index         date_of_call
0      1707339734808  03dd5e33-0f82-4052-90f4-8eb45a95a876   104  1707350400000      8  2024-02-08 00:00:00
1      1707393597841  e670f27e-b92c-4334-ac78-fcba19d7c53a   511  1707393600000      9  2024-02-08 12:00:00
2      1707394507778  18501f32-b58f-4c72-ad8b-f23454e9f7c9   570  1707436800000     10  2024-02-09 00:00:00
3      1707577315820  b36d44fa-a3e3-4cb7-81ec-edfaa0b50ea1  2302  1707609600000     14  2024-02-11 00:00:00
4      1707757246323  82ad1f2e-a5c1-43eb-a9a1-395683c2102a    10  1707782400000     18  2024-02-13 00:00:00
5      1707836350869  6faab632-64f5-4281-9978-17c15da25bf9    10  1707868800000     20  2024-02-14 00:00:00
6      1707904113589  409b0773-a887-43f9-9d42-53980b5a231c  2428  1707912000000     21  2024-02-14 12:00:00
7      1707932370390  7a62693b-8d2c-4ae1-bd89-b973e3715af0  1987  1707955200000     22  2024-02-15 00:00:00
8      1707989176839  920ff6

In [46]:
for k, v in ent_df.items():
  json_out = json_concat(v)
  filename = './data/' + k + '_' + type.upper() + '_' + date.today().strftime('%Y%m%d') + '.json'
  string = json.dumps(json_out)
  json_data = json.loads(
    string
      .replace("\\t", "\\\\t")
      .replace("\\n", "\\\\n")
  )
  with open(filename, 'w') as output_data:
    output_data.write(
      json.dumps(json_data, ensure_ascii=False)
    )

  df_raw = json_to_pd(json_data)
  df_raw.to_excel('./data/' + k + '_' + type.upper() + '_' + date.today().strftime('%Y%m%d') + '.xlsx',index=False)

In [210]:
import psycopg2
import psycopg2.extras
from etl.create_table import *
from etl.psycopg2_database_helper import *

params = {
  'host': 'localhost',
  'database': 'postgres',
  'user': 'postgres',
  'password': 'postgres',
  'port': 5432
}

conn = get_postgres_connection(
  host=params['host'],
  database=params['database'],
  user = params['user'],
  password = params['password'],
  port = params['port']
)

cursor = conn.cursor()
conn.autocommit = True

sql = '''CREATE DATABASE "RawDB" WITH OWNER = postgres ENCODING = 'UTF8' LOCALE_PROVIDER = 'libc' CONNECTION LIMIT = -1 IS_TEMPLATE = False;'''

# # Executing the query 
# cursor.execute(sql) 
# sql = '''
#         CREATE TABLE delivery_results (
#             delivery_execution_id VARCHAR(100) PRIMARY KEY,
#             delivery_id character(12) NOT NULL,
#             is_deleted BOOL NOT NULL,
#             last_update_date BIGINT NOT NULL,
#             login VARCHAR(50) NOT NULL,
#             test_qti_id VARCHAR(255) NOT NULL,
#             test_qti_label VARCHAR(255) NOT NULL,
#             test_qti_title VARCHAR(255) NOT NULL,
#             raw_data TEXT NOT NULL
#         );
#         '''
# cursor.execute(sql) 

sqlfile = open('./data/schema.sql','r')
cursor.execute(sqlfile.read())

# close communication with the PostgreSQL database server
cursor.close()
# commit the changes
conn.commit()

In [201]:
import etl.postgresqlschemareader as pgsr

pgsr_tables = pgsr.get_tables(conn)
pgsr_cols = pgsr.get_columns(conn,'public','delivery_results')
pgsr_tree = pgsr.get_tree(conn)

pgsr.print_tree(pgsr_tree)
pgsr.print_columns(pgsr_cols)

oat.deliveries
 |-rowId (bigint)
 |-id (character varying)
 |-testQtiId (character varying)
 |-testQtiLabel (character varying)
 |-testQtiTitle (character varying)
 |-rawData (json)
 |-insertedDate (timestamp with time zone)
oat.deliveryExecutions
 |-rowId (bigint)
 |-deliveryExecutionId (character varying)
 |-contextId (character varying)
 |-login (character varying)
 |-lastUpdateDate (timestamp with time zone)
 |-status (character varying)
 |-finishTime (time with time zone)
 |-rawData (json)
oat.deliveryResults
 |-rowId (bigint)
 |-deliveryExecutionId (character varying)
 |-batteryId (character varying)
 |-deliveryId (character varying)
 |-login (character varying)
 |-lastUpdateDate (timestamp with time zone)
 |-isDeleted (bit)
 |-testQtiId (character varying)
 |-testQtiLabel (character varying)
 |-testQtiTitle (character varying)
 |-insertedDate (timestamp with time zone)
 |-rawData (json)


In [16]:
def get_dat(json):
  meta_cols = [
    # 'battery_id',
    'delivery_execution_id',
    'delivery_id',
    'isDeleted',
    'last_update_date',
    'login',
    'test_qti_id',
    'test_qti_label',
    'test_qti_title',
    'raw_data'
  ]

  dat = (
    pd.json_normalize(
      resp,record_path = 'data',max_level=0
    )
    .rename(
      columns={
        'batteryId':'battery_id',
        'deliveryExecutionId':'delivery_execution_id',
        'deliveryId':'delivery_id',
        'testQtiId':'test_qti_id',
        'testQtiLabel':'test_qti_label',
        'testQtiTitle':'test_qti_title',
        'items':'raw_data'
      }
    )
    .filter(
      # regex = "|".join(meta_cols[0:-1]) + '|^items\.',
      regex = "|".join(meta_cols),
      axis=1
    )
  )

  return dat

In [17]:
entities = [
  'datastoreDeliveryResults',
  # 'datastoreUIEvents',
  # 'datastoreDeliveries'
]

del_df = {}

for ent in entities:
  url = api_url(type=type,endpoint=('search/%s' % ent))

  filters=[
    {
      "field": "deliveryId",
      "type": "terms",
      "values": deliveries
    }
  ]
  # pageSize=None
  # filters=None

  payload_cnt = payload_ent(entity=ent, filters=filters, pageSize = 0)

  resp_cnt = req_func(req_type='post',url=url,token=token, data=payload_cnt)
  total_records = resp_cnt.json()['totalResults']

  datas = []
  for i in range(math.ceil(total_records/10000)):
    print(i)
    payload = payload_ent(entity=ent, pageSize=10000, filters=filters, pageNumber=i)
    resp = req_func(req_type='post',url=url,token=token, data=payload)
    df = api_json_extract(resp, entity = ent, options = {'pandas': False})
    datas.append(df)

  del_df[ent] = datas


0


In [17]:
for k,v in del_df.items():
  filename = './data/' + 'all_delivery_results_' + type.upper() + '_' + date.today().strftime('%Y%m%d') + '.json'
  string = json.dumps(v)
  json_data = json.loads(
    string
      .replace("\\t", "\\\\t")
      .replace("\\n", "\\\\n")
  )
  with open(filename, 'w') as output_data:
    output_data.write(
      json.dumps(json_data, ensure_ascii=False)
    )

In [221]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from psycopg2.extras import Json
from psycopg2.extensions import register_adapter

register_adapter(dict, Json)

all_deliveries_results['outcomes'] = all_deliveries_results['outcomes'].apply(lambda x: json.dumps(x))
all_deliveries_results['rawItems'] = all_deliveries_results['rawItems'].apply(lambda x: json.dumps(x))

df_json = spark.createDataFrame(all_deliveries_results) \
  .withColumnRenamed(
    'rawItems','raw_data'
  )

df_json.printSchema()

upsert_spark_df_to_postgres(
  dataframe_to_upsert=df_json,
  table_name='oat."deliveryResults"',
  table_unique_key=['deliveryExecutionId'],
  database_credentials=params
)

root
 |-- deliveryExecutionId: string (nullable = true)
 |-- sessionEndTime: long (nullable = true)
 |-- sessionStartTime: long (nullable = true)
 |-- login: string (nullable = true)
 |-- testQtiId: string (nullable = true)
 |-- testQtiTitle: string (nullable = true)
 |-- submissionTime: long (nullable = true)
 |-- deliveryId: string (nullable = true)
 |-- isDeleted: boolean (nullable = true)
 |-- outcomes: string (nullable = true)
 |-- testQtiLabel: string (nullable = true)
 |-- raw_data: string (nullable = true)
 |-- last_update_date: long (nullable = true)
 |-- _id: string (nullable = true)



24/02/05 03:09:53 WARN TaskSetManager: Stage 10 contains a task of very large size (58580 KiB). The maximum recommended task size is 1000 KiB.
[Stage 10:>                                                         (0 + 1) / 1]


#################################################
 Total records loaded - 1000
 Total records rejected - 1000
#################################################

 Started Printing Error Messages ....
['column "deliveryexecutionid" of relation "deliveryResults" does not exist\nLINE 1:  INSERT INTO oat."deliveryResults" (deliveryExecutionId, ses...\n                                            ^\n', 'column "deliveryexecutionid" of relation "deliveryResults" does not exist\nLINE 1:  INSERT INTO oat."deliveryResults" (deliveryExecutionId, ses...\n                                            ^\n', 'column "deliveryexecutionid" of relation "deliveryResults" does not exist\nLINE 1:  INSERT INTO oat."deliveryResults" (deliveryExecutionId, ses...\n                                            ^\n', 'column "deliveryexecutionid" of relation "deliveryResults" does not exist\nLINE 1:  INSERT INTO oat."deliveryResults" (deliveryExecutionId, ses...\n                                            ^\n', 'co

                                                                                

In [229]:
import glob, re
from pyspark.sql.functions import *
from pyspark.sql.types import *

json_files = glob.glob("./data/datastoreDeliveryResults.json")
# json_files = [x for x in json_files if 'db328a265d14' in x]

for f in json_files:
  df_json = spark.read.option("multiline","true") \
    .json(f) \
    .select(
      '*',
      explode('data').alias("dataExplode")
    ) \
    .select("dataExplode.*") \
    .select(
      '*',
      explode('raw_data').alias("raw_dataExplode")
    ) \
    .select("raw_dataExplode.*") \
    .withColumn(
      'raw_data',
      to_json('raw_data')
    )
      # .selectExpr(
    #   'deliveryExecutionId as delivery_execution_id',
    #   'deliveryId as delivery_id',
    #   'isDeleted as is_deleted',
    #   'last_update_date',
    #   'login',
    #   'testQtiid as test_qti_id',
    #   'testQtiLabel as test_qti_label',
    #   'testQtiTitle as test_qti_title',
    #   'items as raw_data'
    # ) \
    
  
  df_json.show()
  df_json.printSchema()

  # upsert_spark_df_to_postgres(
  #   dataframe_to_upsert=df_json,
  #   table_name='delivery_results',
  #   table_unique_key=['delivery_execution_id'],
  #   database_credentials=params
  # )

In [24]:
# del_keys = list(del_df)
# raw_data = pd.DataFrame()
# raw_data_all = []
# for f in del_keys:
#   dat = get_dat(api_json_extract(del_df[f], entity = None, options = {'pandas': False}))
#   raw_data_all.append(dat)

# raw_data = pd.concat(raw_data_all,axis=0,ignore_index=True)

raw_data = all_deliveries_results
raw_data.head(5)

Unnamed: 0,metadata,sessionEndTime,metadataRaw,login,testQtiId,testQtiTitle,submissionTime,duration,score,deliveryId,...,sessionStartTime,maxScore,rawLtiParameters,testQtiLabel,rawItems,publicationTime,tenantId,items,_id,messageId
0,"{'PISA25 Domains': 'https://www.oecd.org/STQ',...",1706008452000,"[{'propertyLabel': 'Label', 'values': [{'vuri'...",QQ3,ZOVLEOIO,StQ,1706008452000,5,0,87040b6da779,...,1706008447000,0,"{'tenant_id': '18', 'sub': 'QQ3', 'https://pur...",StQ-development,"[{'numAttempts': 1, 'rawOutcomes': [{'identifi...",1706008275000,18,"{'ST410': {'numAttempts': 1, 'statusCorrect': ...",M1FRIzg3MDQwYjZkYTc3OSMwYTkyZmFiMzIzMDEzNGNjYT...,
1,"{'PISA25 Domains': 'https://www.oecd.org/SCI',...",1706085753000,"[{'propertyLabel': 'Label', 'values': [{'vuri'...",CG-MNE20,HZTYIRAY,Science Coding Guide Items - Trend,1706085753000,307,0,a048b08baeed,...,1706085446000,0,"{'tenant_id': '18', 'sub': 'CG-MNE20', 'https:...",Science Coding Guide Items - Trend,"[{'numAttempts': 2, 'rawOutcomes': [{'identifi...",1705594156000,18,"{'item-9': {'numAttempts': 1, 'statusCorrect':...",MDJFTk0tR0MjYTA0OGIwOGJhZWVkIzRmODc1YzlmNGE4YW...,
2,"{'PISA25 Domains': 'https://www.oecd.org/SCI',...",1706865971000,"[{'propertyLabel': 'Label', 'values': [{'vuri'...",CG-HKG20,HZTYIRAY,Science Coding Guide items - New,1706865971000,101958,2,7f5275e566c9,...,1706764013000,41,"{'tenant_id': '18', 'sub': 'CG-HKG20', 'https:...",Science Coding Guide Items - New,"[{'numAttempts': 1, 'rawOutcomes': [{'identifi...",1705589404000,18,"{'item-18': {'numAttempts': 1, 'statusCorrect'...",MDJHS0gtR0MjN2Y1Mjc1ZTU2NmM5IzJhNTgyMjMxMzFlNW...,
3,"{'PISA25 Domains': 'https://www.oecd.org/SCI',...",1706756096000,"[{'propertyLabel': 'Label', 'values': [{'vuri'...",CG-LUX20,HZTYIRAY,Science Coding Guide Items - Trend,1706756096000,477,0,a048b08baeed,...,1706755619000,0,"{'tenant_id': '18', 'sub': 'CG-LUX20', 'https:...",Science Coding Guide Items - Trend,"[{'numAttempts': 2, 'rawOutcomes': [{'identifi...",1705594156000,18,"{'item-9': {'numAttempts': 1, 'statusCorrect':...",MDJYVUwtR0MjYTA0OGIwOGJhZWVkIzRmODc1YzlmNGE4YW...,
4,"{'PISA25 Group': 'batch1', 'PISA25 Is Translat...",1707084391000,"[{'propertyLabel': 'Label', 'values': [{'vuri'...",TSC6030,CUSAS002,Fuel to Minimise Carbon Dioxide (CUSAS002),1707084391000,23,0,a0ccf30faf32,...,1707084368000,4,"{'sub': 'TSC6030', 'https://purl.imsglobal.org...",Fuel to Minimise Carbon Dioxide (CUSAS002),"[{'numAttempts': 1, 'rawOutcomes': [{'identifi...",1706479391000,18,"{'item-4': {'numAttempts': 1, 'statusCorrect':...",MDMwNkNTVCNhMGNjZjMwZmFmMzIjYWRhNjZlZGE3M2YzYT...,1.0342833531691882e+16


In [69]:
import great_expectations as gx

raw_data = gx.from_pandas(raw_data)

In [70]:
def validateJSON(jsonData):
  try:
    json.loads(json.dumps(jsonData))
  except ValueError as err:
    return False
  return True

raw_data['valid_json'] = raw_data.apply(
  lambda d: validateJSON(d['items']),
  axis = 1
)

raw_data.head(5)
raw_data.expect_column_distinct_values_to_be_in_set(
  column = 'valid_json',
  value_set = [True]
)

{
  "success": true,
  "result": {
    "observed_value": [
      true
    ],
    "element_count": 1228,
    "missing_count": null,
    "missing_percent": null
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [34]:
def json_key_item(jsonData):
  try:
    if(isinstance(jsonData,list)):
      check_list = []
      for i in jsonData:
        keys = list(i.keys())
        check_key = all(('item-' in s) for s in keys)
        check_list.append(check_key)
      check=all(check_list)
    elif(isinstance(jsonData,dict)):
      keys = list(jsonData.keys())
      check = all(('item-' in s) for s in keys)
  except ValueError as err:
    return False
  return check

raw_data['item_keys'] = raw_data.apply(
  lambda d: json_key_item(d['items']),
  axis = 1
)

raw_data.head()
raw_data.expect_column_distinct_values_to_be_in_set(
  column = 'item_keys',
  value_set = [True]
)

{
  "success": false,
  "result": {
    "observed_value": [
      false,
      true
    ],
    "element_count": 1228,
    "missing_count": null,
    "missing_percent": null
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [None]:
array = raw_data.items[0]
# keys = raw_data.items[0].keys()
# values = [raw_data.items[0][lab]['maxScore'] for lab in keys]
# values = [json.loads(raw_data.raw_data[0][lab]['responses']['RESPONSE']['value'])['ts'] for lab in keys]
# res = dict(map(lambda i,j : (i,j) , keys,values))
# l = [isinstance(s,str) for k,s in res.items()]
# all(l)

In [35]:
result_list = []
for k, v in raw_data.items.items():
  qti_label = list(pd.json_normalize(raw_data.raw_data[0],max_level=0).transpose().apply(
    lambda df: df[0]['qtiLabel']
  ))
  result_list.append(qti_label)

AttributeError: 'function' object has no attribute 'items'

In [72]:
from schemadict import schemadict, STANDARD_VALIDATORS

my_validators = STANDARD_VALIDATORS

my_validators['$required_keys'] = [
  'numAttempts',
  'statusCorrect',
  'qtiTitle',
  'maxScore',
  'submissionTime',
  'qtiId',
  'qtiLabel',
  'duration',
  'score',
  'itemEndTime',
  'outcomes',
  'responses',
  'completionStatus',
  'itemStartTime'
]

schema_outcome = schemadict({
  # "SCORE": {"type": str},
  "completionStatus": {"type": str}
})

schema_response = schemadict({
  "correct": {"type": bool},
  "value": {"type": str}
})

schema = schemadict(
  {
    "numAttempts": {"type": int, '>=': 0},
    "statusCorrect": {"type": str},
    'qtiTitle': {"type": str},
    # 'maxScore': {"type": str},
    'submissionTime': {"type": int},
    'qtiId': {"type": str},
    'qtiLabel': {"type": str},
    'duration': {"type": int},
    # 'score': {"type": int},
    'itemEndTime': {"type": int},
    'outcomes': {
      "type": dict,
      'item_type': dict,
      'item_schemadict': schema_outcome
    },
    'responses': {
      "type": dict,
      'item_type': dict,
      'item_schemadict': schema_response
    },
    'completionStatus': {"type": str},
    'itemStartTime' : {"type": int}
  },
  validators=my_validators
)

schema.keys()


KeysView(schemadict({'numAttempts': {'type': <class 'int'>, '>=': 0}, 'statusCorrect': {'type': <class 'str'>}, 'qtiTitle': {'type': <class 'str'>}, 'submissionTime': {'type': <class 'int'>}, 'qtiId': {'type': <class 'str'>}, 'qtiLabel': {'type': <class 'str'>}, 'duration': {'type': <class 'int'>}, 'itemEndTime': {'type': <class 'int'>}, 'outcomes': {'type': <class 'dict'>, 'item_type': <class 'dict'>, 'item_schemadict': schemadict({'completionStatus': {'type': <class 'str'>}})}, 'responses': {'type': <class 'dict'>, 'item_type': <class 'dict'>, 'item_schemadict': schemadict({'correct': {'type': <class 'bool'>}, 'value': {'type': <class 'str'>}})}, 'completionStatus': {'type': <class 'str'>}, 'itemStartTime': {'type': <class 'int'>}}))

In [90]:

# Describe what kind of json you expect.
raw_resp_schema = {
  "type": "object",
  "properties": {
    "numAttempts": {"type": "number"},
    "statusCorrect": {"type": "string"},
    'qtiTitle': {"type": "string"},
    # 'maxScore': {"type": "number"},
    'submissionTime': {"type": "number"},
    'qtiId': {"type": "str"},
    'qtiLabel': {"type": "str"},
    'duration': {"type": "number"},
    # 'score': {"type": "number"},
    'itemEndTime': {"type": "number"},
    'outcomes': {
      "type": "object",
      "properties": {
        #  "SCORE": {"type": "number"},
         "completionStatus": {"type": "string"}
      },
      "required": ['SCORE','completionStatus']
    },
    'responses': {
      "type": "object",
      "properties": {
        "RESPONSE": {
          "type": "object",
          "properties":{
             "correct": {"type": "boolean"},
            #  "value": {"type": "string"}
          },
          "required": ['correct','value']
        },
        "completionStatus": {"type": "string"},
        "itemStartTime": {"type": "number"}
      }
    },
    'completionStatus': {"type": "string"},
    'itemStartTime' : {"type": "number"},
  },
  "required": ['numAttempts',
    'statusCorrect',
    'qtiTitle',
    'maxScore',
    'submissionTime',
    'qtiId',
    'qtiLabel',
    'duration',
    'score',
    'itemEndTime',
    'outcomes',
    'responses',
    'completionStatus',
    'itemStartTime'
  ]
}


In [73]:
import jsonschema
from jsonschema import validate

def validateJsonSchema(jsonData,schema):
    try:
      if(isinstance(jsonData,list)):
        all([schema.validate(v) for v in jsonData])
      elif(isinstance(jsonData,dict)):
        all([schema.validate(v) for k,v in jsonData.items()])
    except TypeError as err:
      return str(err)
    return "Valid"

raw_data['item_json_schema'] = raw_data.apply(
  lambda d: validateJsonSchema(d['items'],schema),
  axis = 1
)

raw_data.head(5)
raw_data.expect_column_distinct_values_to_be_in_set(
  column = 'item_json_schema',
  value_set = ['Valid']
)

{
  "success": true,
  "result": {
    "observed_value": [
      "Valid"
    ],
    "element_count": 1228,
    "missing_count": null,
    "missing_percent": null
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [77]:
outcome = schema.validate(list(raw_data['items'][0].values())[0])
print(outcome)

None


In [None]:
array = raw_data.raw_data[0].values()
keys = list(raw_data.raw_data[0].keys())
values = [raw_data.raw_data[0][lab]['responses'] for lab in keys]
# values = [json.loads(raw_data.raw_data[0][lab]['responses']) for lab in keys]
res = dict(map(lambda i,j : (i,j) , keys,values))
# l = [isinstance(s,str) for k,s in res.items()]
# all(l)

In [None]:
print("Valid JSON: " + str(all(raw_data.valid_json)))
print("Items as keys: " + str(all(raw_data.item_keys)))
print("Correct JSON Schema: " + str(all(raw_data.item_json_schema)))

In [99]:
df = raw_data.explode('items').reset_index(drop = True)

In [100]:
df.head()

Unnamed: 0,metadata,sessionEndTime,metadataRaw,login,testQtiId,testQtiTitle,submissionTime,duration,score,deliveryId,...,testQtiLabel,rawItems,publicationTime,tenantId,items,_id,messageId,valid_json,item_keys,item_json_schema
0,"{'PISA25 Domains': 'https://www.oecd.org/STQ',...",1706008452000,"[{'propertyLabel': 'Label', 'values': [{'vuri'...",QQ3,ZOVLEOIO,StQ,1706008452000,5,0,87040b6da779,...,StQ-development,"[{'numAttempts': 1, 'rawOutcomes': [{'identifi...",1706008275000,18,ST410,M1FRIzg3MDQwYjZkYTc3OSMwYTkyZmFiMzIzMDEzNGNjYT...,,True,False,Valid
1,"{'PISA25 Domains': 'https://www.oecd.org/STQ',...",1706008452000,"[{'propertyLabel': 'Label', 'values': [{'vuri'...",QQ3,ZOVLEOIO,StQ,1706008452000,5,0,87040b6da779,...,StQ-development,"[{'numAttempts': 1, 'rawOutcomes': [{'identifi...",1706008275000,18,ST411,M1FRIzg3MDQwYjZkYTc3OSMwYTkyZmFiMzIzMDEzNGNjYT...,,True,False,Valid
2,"{'PISA25 Domains': 'https://www.oecd.org/SCI',...",1706085753000,"[{'propertyLabel': 'Label', 'values': [{'vuri'...",CG-MNE20,HZTYIRAY,Science Coding Guide Items - Trend,1706085753000,307,0,a048b08baeed,...,Science Coding Guide Items - Trend,"[{'numAttempts': 2, 'rawOutcomes': [{'identifi...",1705594156000,18,item-9,MDJFTk0tR0MjYTA0OGIwOGJhZWVkIzRmODc1YzlmNGE4YW...,,True,True,Valid
3,"{'PISA25 Domains': 'https://www.oecd.org/SCI',...",1706085753000,"[{'propertyLabel': 'Label', 'values': [{'vuri'...",CG-MNE20,HZTYIRAY,Science Coding Guide Items - Trend,1706085753000,307,0,a048b08baeed,...,Science Coding Guide Items - Trend,"[{'numAttempts': 2, 'rawOutcomes': [{'identifi...",1705594156000,18,item-18,MDJFTk0tR0MjYTA0OGIwOGJhZWVkIzRmODc1YzlmNGE4YW...,,True,True,Valid
4,"{'PISA25 Domains': 'https://www.oecd.org/SCI',...",1706085753000,"[{'propertyLabel': 'Label', 'values': [{'vuri'...",CG-MNE20,HZTYIRAY,Science Coding Guide Items - Trend,1706085753000,307,0,a048b08baeed,...,Science Coding Guide Items - Trend,"[{'numAttempts': 2, 'rawOutcomes': [{'identifi...",1705594156000,18,item-8,MDJFTk0tR0MjYTA0OGIwOGJhZWVkIzRmODc1YzlmNGE4YW...,,True,True,Valid


In [95]:
tmp = (
  raw_data
  .join(
    pd.json_normalize(
      raw_data['items'],
      max_level = 0
    )
  )
  .drop(
    ['items'],
    axis = 1
  )
)

stub_cols = tmp.columns[tmp.columns.str.startswith("item-")]
id_cols = set(tmp.columns) - set(stub_cols)

raw_data_melt = tmp.melt(
  var_name = "items",
  value_name= "vars",
  id_vars = id_cols,
  value_vars = stub_cols
)

raw_data_melt.head(5)

Unnamed: 0,ST490,cluster2-CACERL009-item-12,ST804QCY,cluster1-CACERL002-item-9,cluster2-CACERL007-item-11,cluster2-CACERL008-item-3,cluster2-CACERL008-item-11,TC146FL,TC161FL,ST801FLDEU,...,maxScore,cluster2-CACERL007-item-9,cluster2-CPERS102-item-4,TC114FL,last_update_date,cluster2-S428-item1,cluster1-CACERL006-item-14,tenantId,items,vars
0,,,,,,,,,,,...,0,,,,1706008455379,,,18,item-9,
1,,,,,,,,,,,...,0,,,,1706085756771,,,18,item-9,"{'numAttempts': 1, 'statusCorrect': 'incorrect..."
2,,,,,,,,,,,...,41,,,,1706865974943,,,18,item-9,"{'numAttempts': 1, 'statusCorrect': 'correct',..."
3,,,,,,,,,,,...,0,,,,1706756098586,,,18,item-9,"{'numAttempts': 1, 'statusCorrect': 'incorrect..."
4,,,,,,,,,,,...,4,,,,1707084393799,,,18,item-9,


In [120]:
meta_cols = ['deliveryExecutionId','deliveryId','login']

s = raw_data.set_index(meta_cols)['items']
out = pd.DataFrame(s.tolist(),index = s.index).reset_index()

# tmp.head(5)

# stub_cols = tmp.columns[tmp.columns.str.startswith("item-")]
stub_cols = set(tmp.columns) - set(meta_cols)

raw_data_melt = tmp.melt(
  var_name = "items",
  value_name= "vars",
  id_vars = meta_cols,
  value_vars = stub_cols
).dropna()

df = raw_data_melt['vars'].apply(pd.Series)

result = pd.concat([raw_data_melt,df],axis = 1).drop('vars',axis = 1)

result.head(5)

Unnamed: 0,deliveryExecutionId,deliveryId,login,items,numAttempts,statusCorrect,qtiTitle,maxScore,submissionTime,qtiId,...,spec_lti_claim_custom_deliverySettings_plugins_disableCommands_enabled,spec_lti_claim_custom_deliverySettings_plugins_preventScreenshot_enabled,spec_lti_claim_custom_deliverySettings_plugins_forceFullScreen_enabled,spec_lti_claim_custom_deliverySettings_plugins_pauseOnBlur_autoresume,spec_lti_claim_custom_deliverySettings_attemptId,spec_lti_claim_custom_portalLaunch,spec_lti_claim_custom_deliverySettings_plugins_pauseOnBlur_enabled,SCORE_TOTAL_MAX,SCORE_RATIO,SCORE_TOTAL
2,02GKH-GC#7f5275e566c9#2a58223131e5a64a6a646b29...,7f5275e566c9,CG-HKG20,item-80,1.0,correct,CPERS102Q04,0,1706866000000.0,i63a147ebd300c26079455ec3fed58aa170,...,,,,,,,,,,
51,10EHC-GC#7f5275e566c9#2a58223131e5a64a6a646b29...,7f5275e566c9,CG-CHE01,item-80,1.0,correct,CPERS102Q04,0,1706756000000.0,i63a147ebd300c26079455ec3fed58aa170,...,,,,,,,,,,
53,10MNV-GC#7f5275e566c9#2a58223131e5a64a6a646b29...,7f5275e566c9,CG-VNM01,item-80,1.0,correct,CPERS102Q04,0,1706866000000.0,i63a147ebd300c26079455ec3fed58aa170,...,,,,,,,,,,
54,10NAC-GC#7f5275e566c9#2a58223131e5a64a6a646b29...,7f5275e566c9,CG-CAN01,item-80,1.0,correct,CPERS102Q04,0,1706752000000.0,i63a147ebd300c26079455ec3fed58aa170,...,,,,,,,,,,
56,10NRB-GC#7f5275e566c9#2a58223131e5a64a6a646b29...,7f5275e566c9,CG-BRN01,item-80,1.0,correct,CPERS102Q04,0,1706609000000.0,i63a147ebd300c26079455ec3fed58aa170,...,,,,,,,,,,


In [125]:
df.head(5)

Unnamed: 0,numAttempts,statusCorrect,qtiTitle,maxScore,submissionTime,qtiId,qtiLabel,duration,score,itemEndTime,...,spec_lti_claim_custom_deliverySettings_plugins_disableCommands_enabled,spec_lti_claim_custom_deliverySettings_plugins_preventScreenshot_enabled,spec_lti_claim_custom_deliverySettings_plugins_forceFullScreen_enabled,spec_lti_claim_custom_deliverySettings_plugins_pauseOnBlur_autoresume,spec_lti_claim_custom_deliverySettings_attemptId,spec_lti_claim_custom_portalLaunch,spec_lti_claim_custom_deliverySettings_plugins_pauseOnBlur_enabled,SCORE_TOTAL_MAX,SCORE_RATIO,SCORE_TOTAL
2,1.0,correct,CPERS102Q04,0,1706866000000.0,i63a147ebd300c26079455ec3fed58aa170,CPERS102Q04,1.0,0,1706866000000.0,...,,,,,,,,,,
51,1.0,correct,CPERS102Q04,0,1706756000000.0,i63a147ebd300c26079455ec3fed58aa170,CPERS102Q04,162.0,0,1706695000000.0,...,,,,,,,,,,
53,1.0,correct,CPERS102Q04,0,1706866000000.0,i63a147ebd300c26079455ec3fed58aa170,CPERS102Q04,1.0,0,1706866000000.0,...,,,,,,,,,,
54,1.0,correct,CPERS102Q04,0,1706752000000.0,i63a147ebd300c26079455ec3fed58aa170,CPERS102Q04,11.0,0,1706280000000.0,...,,,,,,,,,,
56,1.0,correct,CPERS102Q04,0,1706609000000.0,i63a147ebd300c26079455ec3fed58aa170,CPERS102Q04,19.0,0,1706608000000.0,...,,,,,,,,,,


In [None]:
json_dat = pd.json_normalize(raw_data_melt.vars.tolist())

meta_cols = [
  # 'battery_id',
  'delivery_execution_id',
  'delivery_id',
  'isDeleted',
  'last_update_date',
  'login',
  'test_qti_id',
  'test_qti_label',
  'test_qti_title',
  'raw_data'
]

id_cols2 = set(raw_data_melt.columns) - set(['vars'])
extra_cols = ['items','vars','values']
final_cols = meta_cols
final_cols.extend(extra_cols)

dat_long = (
  raw_data_melt
  .join(
    json_dat
  )
  .drop(
    ['vars'],
    axis = 1
  )
  .melt(
    var_name = "vars",
    value_name= "values",
    id_vars = id_cols2,
    value_vars = json_dat.columns
  )
  .reindex(
    columns=final_cols
  )
)

dat_long.head(20)

In [None]:
dat = dat_long[dat_long['vars'].str.startswith('responses.')]
dat = dat[dat['vars'].str.endswith('value')]
dat

In [None]:
d = raw_data.raw_data[0]
def flatten_dict(nested_dict):
    res = {}
    if isinstance(nested_dict, dict):
        for k in nested_dict:
            flattened_dict = flatten_dict(nested_dict[k])
            for key, val in flattened_dict.items():
                key = list(key)
                key.insert(0, k)
                res[tuple(key)] = val
    else:
        res[()] = nested_dict
    return res


def nested_dict_to_df(values_dict):
    flat_dict = flatten_dict(values_dict)
    df = pd.DataFrame.from_dict(flat_dict, orient="index")
    df.index = pd.MultiIndex.from_tuples(df.index)
    df = df.unstack(level=-1)
    df.columns = df.columns.map("{0[1]}".format)
    return df

df = nested_dict_to_df(d)

In [None]:
import datetime

timestamp = "1705282053574"
your_dt = datetime.datetime.fromtimestamp(int(timestamp)/1000)  # using the local timezone
print(your_dt.strftime("%Y-%m-%d %H:%M:%S"))

In [None]:
filename = 'datastoreDeliveryResults_d60f257c8d73.json'
string = json.dumps(output)
json_data = json.loads(
  string
    .replace("\\t", "\\\\t")
    .replace("\\n", "\\\\n")
)

# with open(filename, 'w') as output_data:
#   output_data.write(
#     json.dumps(json_data, ensure_ascii=False)
#   )
  

In [None]:
with open(filename) as json_data:
  data = json.load(json_data)
  df = pd.DataFrame(data['data'])
  meta_data = pd.DataFrame(df.metadata.values.tolist()).drop_duplicates()

meta_data


In [None]:
def flatten_json(nested_json, exclude=['']):
    """Flatten json object with nested keys into a single level.
        Args:
            nested_json: A nested json object.
            exclude: Keys to exclude from output.
        Returns:
            The flattened json object if successful, None otherwise.
    """
    out = {}

    def flatten(x, name='', exclude=exclude):
        if type(x) is dict:
            for a in x:
                if a not in exclude: flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(nested_json)
    return out

In [None]:
def flatten(data, new_data):
    """Recursive helper function.

    Args:
        data: nested dictionary.
        new_data: empty dictionary.

    Returns:
        Flattened dictionary.

    """
    for key, value in data.items():
        if isinstance(value, dict):
            flatten(value, new_data)
        if isinstance(value, str) or isinstance(value, int) or isinstance(value, list):
            new_data[key] = value
    return new_data

In [None]:
from functools import reduce

def _explode(df, col):
    df = df.explode(col)

    if isinstance(df.iloc[0][col], list):
        df = _explode(df, col)
    elif isinstance(df.iloc[0][col], object):
        df_child = pd.json_normalize(df[col])
        # To prevent column name collision, add the parent column name as prefix.
        df_child.columns = [f'{col}.{x}' for x in df_child.columns]
        df = pd.concat([df.loc[:, ~df.columns.isin([col])].reset_index(drop=True), df_child], axis=1)
    
    return df

def full_explode_normalize(df):
    # Extract list columns 
    explode_cols = [x for x in df.columns if isinstance(df.iloc[0][x], list)]
    if len(explode_cols) < 1:
        return df
    
    # Explode and normalize the list
    df = reduce(_explode, explode_cols, df)

    return df

df_explode = full_explode_normalize(all_deliveries)
# df
# df_explode.columns
# df_explode.to_csv('df_explode.csv')

In [None]:
df_explode.head(5)

### Sourcing Data from RDBMS tables

In [None]:
# Read sqlite query results into a pandas DataFrame
with sqlite3.connect("movies.sqlite") as conn:
    df = pd.read_sql("SELECT * from movies", conn)
df.head()

# Sourcing data from Webpages

Please visit the url https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)

In [None]:
# get data from url
df_html = pd.read_html('https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)',match = 'by country')
# Let's see how many tables are there with tage ' by county'
print(len(df_html)) # There are 4 tables
# Let's see the first table
df_html[0]