In [55]:
import gzip
import json
import shutil
import os
from ast import literal_eval
import pandas as pd
from datetime import datetime
import pyodbc
import sqlalchemy as sal

In [56]:
# For unzipping .gz files 
for f in os.listdir():
  if 'json' in f:  
      with gzip.open(f, 'rb') as f_in:
          with open(f.replace('.gz',''), 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

---
# Data Wrangling

In this section, we will be formatting our JSON files, processing nested JSON objects, and cleaning up the overall structure of the datasets.
## File Formatting

Due to invalid formatting found in the JSON files during data modeling, the code below assist us in validating and cleaning JSON data for processing. First, run **python -m json.tool filename** in the command line to check whether the file is a valid JSON document. You should receive an error if invalid. Othewise, the whole file prints. 

If an error is confirmed, run the data pipeline below to render a clean json file. The data pipeline is defined to utilze the JSONDecoder.raw_decode() (and its undocumented second parameter) to traverse the data, look for valid JSON structures in an iterative manner, and parse any invalid structures it encounters. A nice benefit to this built-in json module is that it will properly parse the data even if the concatenated JSONs are not properly indented or are just missing. 

Once all our JSON data has been parsed, the file will be outputted, read again, and unnested at the first level. This should aid us in idenitifying which JSON objects need to be flatten even further.

**Please note that even after running the JSON files into the data pipeline, the data will still be structured as a JSON array (or list in Python) rather than the standard JSON object (or dict in Python)**

In [57]:
def jsonFormatter(filename, parsed= None, parser= None): 
    parser = json.JSONDecoder() 
    parsed = [] # a list to hold individually parsed JSON structures
    with open('{filename}.json'.format(filename = filename)) as f: 
        data = f.read() 
        head = 0 # hold the current position as we parse while True: 
        while True:
            head = (data.find('{', head) + 1 or data.find('[', head) + 1) - 1
            try:
                struct, head = parser.raw_decode(data, head)
                parsed.append(struct)
            except (ValueError, json.JSONDecodeError):  # no more valid JSON structures
                break

    with open('{filename}Clean.json'.format(filename = filename), 'w', encoding='utf-8') as jsonfile: # Parsed file is outputted for documentation
        json.dump(parsed, jsonfile, ensure_ascii=False, indent=2)

        df = pd.json_normalize(parsed, max_level = 1) # objects unnested
        df.rename(columns=lambda x: x.split('.')[0].replace(' ','') if '.' in x else x, inplace= True) #removing json keys in column name
        return df

In [58]:
# Load fetch rewards datasets.
users = jsonFormatter('users')
receipts = jsonFormatter('receipts')
brands = jsonFormatter('brands')

## Flattening Deeply Nested JSON Objects

After formatting our datasets into a desired state, our next step is to flatten the deeply nested objects and extract the JSON arrays (or lists) that are embedded within each key. From there, we can ensure we have access to the values that will inform our analysis later on. 

### Receipts

 In order to access the remaining items nested in the Receipts dataset, we need to explode 'rewardsReceiptItemList' so we can access the lists of receipts. From there, we ensure all the values, especially the NAs, are embedded within the lists so that we can convert them into strings and then feed them into our literal_eval function. Finally, after detecting each dictionary and list,  we run json_normalize to unnest all keys and values and merge them back to their respective datasets by index. In the end, each user should have duplicated rows that represent each individual receipt.

In [59]:
receipts = receipts.reindex(sorted(receipts.columns), axis=1) 

In [60]:
receipts = receipts.explode('rewardsReceiptItemList') # explode nested objects
receipts.reset_index(inplace=True)

receipts = receipts.fillna({'rewardsReceiptItemList':'{}'}) # adding curly bracklets to detect lists among NAs
receipts['rewardsReceiptItemList'] = receipts['rewardsReceiptItemList'].apply(lambda x:str(x)) # converting to strings
receipts['rewardsReceiptItemList'] = receipts['rewardsReceiptItemList'].apply(literal_eval) # detecting dictionaries and lists

In [61]:
rewardsReceiptsFlat = pd.json_normalize(receipts['rewardsReceiptItemList'],errors='ignore',record_prefix='rewardsReceiptItemList') # unnesting by variable, ideally performed with meta
rewardsReceiptsFlat.rename(columns =  {'pointsEarned': 'pointsEarnedReceipt'}, inplace= True)
rewardsReceiptsFlat = rewardsReceiptsFlat.reindex(sorted(rewardsReceiptsFlat.columns), axis=1) 

In [62]:
receipts_clean =  pd.merge(receipts, rewardsReceiptsFlat, left_index = True, right_index = True, how = 'outer') # Merging by index

In [63]:
receipts_clean = receipts_clean.drop(['rewardsReceiptItemList', 'index'], axis= 1)
receipts_clean.rename(columns = {'_id':'receiptId'}, inplace= True)
receipts_clean = receipts_clean.dropna(axis = 1, how = 'all')
receipts_clean = receipts_clean.drop_duplicates()
#receipts_clean.reset_index(inplace=True, drop = True)

## Brands

As a reult of the file formatting, the 'cpg' key was unneested, however, the values for the keys 'id' and 'ref' are now contained in two separate columns with duplicated column names, 'cpg'. To access the values, all we need to do is differentiate the columns and unnest the remaining key, 'oid', which er will merge back in by index. 

In [64]:
s = pd.Series(brands.columns)
brands.columns= brands.columns+s.groupby(s).cumcount().replace(0,'').astype(str) # idenitfied duplicated columns and numbered them
brands = brands.reindex(sorted(brands.columns), axis=1) 

In [65]:
cpgFlatId = pd.json_normalize(brands['cpg'], errors='ignore', record_prefix='cpg' , max_level= 1)\
    .add_prefix('cpgId') #unnesting by variable and adding prefix to column name

In [66]:
brands_clean = pd.merge(brands, cpgFlatId, left_index = True, right_index = True, how = 'outer') # Merging by index

In [67]:
brands_clean = brands_clean.drop(['cpg'], axis= 1)
brands_clean.rename(columns =  {'cpg1': 'cpgRef', '_id':'brandId'}, inplace= True)
brands_clean.rename(columns=lambda x: x.split('$')[0].replace(' ','') if '$' in x else x, inplace= True) #removing json keys in column name
#brands_clean = brands_clean.reindex(sorted(brands_clean.columns), axis=1)
brands_clean = brands_clean.dropna(axis = 1, how = 'all')
brands_clean = brands_clean.drop_duplicates()

## UNIX Timestamp to DateTime

Now that we have flatten out all the JSON objects in our datasets, we will now convert the unix/epoch time that are present in some of our variables into a regular time stamp. For this purpose, we are going to define a function that divides our unix epoch by 1000 and convert our seconds to a UTC timestamp. This method will apply to both the User and Receipts datasets. 

Ideally, for bigger datasets, you would want to use pandas.to_datetime() method to convert unix epochs given its faster processing but I forewent this for the sake of customization. 


In [68]:
def dateConverter(x):
  try:
    return(datetime.utcfromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
  except:
    return pd.NaT #coerce non integers to NaT

In [69]:
for col in receipts_clean.columns:
    if 'date' in col.lower():
        receipts_clean[col] = receipts_clean[col].apply(lambda time: dateConverter(time))

In [70]:
users = users.reindex(sorted(users.columns), axis=1) 

In [71]:
for col in users.columns:
    if 'date' in col.lower():
        users[col] = users[col].apply(lambda time: dateConverter(time))

In [72]:
users_clean = users.drop_duplicates()
users_clean = users_clean.dropna(axis = 1, how = 'all')
users_clean.rename(columns = {'_id':'userId'}, inplace= True)

---
# Table Designing in SQL

In this section... 

## Inserting Data into SQL

In [73]:
# Defined function to create url used for creating engines
def url(server_name, database):
    return(f'mssql+pyodbc://{server_name}/{database}?driver=ODBC Driver 17 for SQL Server')

In [74]:
fr = url('LAPTOP-9Q779EDT','fetchRewards')
engine = sal.create_engine(fr)
conn = engine.connect()

In [75]:
#FIX ISSUE WITH NVARCHAR!!!!
users_clean.to_sql('users', conn, if_exists='replace', index=False, chunksize =5000);

In [84]:
#from sqlalchemy.types import NVARCHAR
test.to_sql('receipts', engine, if_exists='replace', index=False);
#dtype={col_name: NVARCHAR for col_name in receipts_clean}

In [77]:
brands_clean.to_sql('brands', conn, if_exists='replace', index=False);

In [79]:
cnxn = pyodbc.connect("Driver={ODBC Driver 17 for SQL Server};"
                      "Server=LAPTOP-9Q779EDT;"
                      "Database=fetchRewards;"
                      "Trusted_Connection=yes;")

In [86]:
cursor = cnxn.cursor()
cursor.execute("""
SELECT r.rewardsReceiptStatus, AVG(CAST([totalSpent] as DECIMAL(9,2))) as avgSpent
FROM receipts r
GROUP BY r.rewardsReceiptStatus;""")
cursor.fetchall()

[('PENDING', Decimal('28.032448')),
 ('FLAGGED', Decimal('2635.570246')),
 ('FINISHED', Decimal('1244.372934')),
 ('SUBMITTED', None),
 ('REJECTED', Decimal('19.544970'))]