# Data Exploration & Analysis
- What does the dataset look like?
- What inconsistencies, anomalies, or patterns do you observe?

Check readme.md first!!!

In [None]:
# lets get to the data.

from pathlib import Path
import pandas as pd
import json
import json_repair as jr

directory = Path('mixed')

data = pd.DataFrame


# count how many files, make sure count of json matches too. 
count_total_files = len([f for f in directory.iterdir() if f.is_file()])
count_json_files = len(list(directory.glob("*.json")))


# a counter for how many files fail to load into the dataframe incase we need to debug.
count_correctly_loaded = 0
count_cannot_load = 0


# basic data structures for the interim import
all_data = []
failed_files = [] # list: [filepath, python error]


# to check if all of the files contain the same number of fields (at root level, ignore nested cols)
file_shape = {}


def put_to_df(row, all):
    flat_data = pd.json_normalize(row, sep='_').to_dict('records')
    all.extend(flat_data)
    file_shape[str(len(*flat_data))] = file_shape.get(str(len(*flat_data)), 0) + 1
    return 1 # simply did not fail, so add 1 to a counter




for json_file in directory.glob('*.json'):
    with open(json_file) as f:
        try:
            data = json.load(f)
            count_correctly_loaded += put_to_df(data, all_data)

        # handle non JSON like files into an error list, to process a 2nd time a different way
        except json.JSONDecodeError as jde:
            failed_files.append([json_file, jde])
            continue


for f in failed_files: 
    with open(f[0]) as j:
        content = j.read().strip()
        p = jr.repair_json(content) # use the json_repair module to auto fix the json file
        d = json.loads(p)
        count_correctly_loaded += put_to_df(d, all_data)
        # we can write the corrected json out to file too, if we wanted to fix source


# effectively continue debugging potential file errors until an acceptable amount of data is inside the dataframe. display for a human below.

print(f'There are {count_total_files}, of which {count_json_files} are json. {count_correctly_loaded} ingested to dataframe succesfully, {count_cannot_load} could not load.')
print(f'The column shape of the json files: {file_shape}')


# convert all the json dict objects to a pandas dataframe for easier processing.
df = pd.DataFrame(all_data)


print(f'Double check DF length: {len(df)}')

# This type of error handling can be managed into a task failure notifcation in airflow.



In [None]:
# proof that a failed json file (error file user_7100 below) ingested into the data objects correctly.
for x in all_data:
    if x['user_id'] == 'd088da63-558d-4008-b378-fde896abafd8':
        print(x)

df.query('user_id=="d088da63-558d-4008-b378-fde896abafd8"')

# What does the dataset look like?
- We have reasonably well structured data, although many fields have poor health that will have to be handled later.
- We have 6 consistent fields, in user_id, name, email, instagram_handle, tiktok_handle.
  - Later in dbt, Lets check for rows with nonsense data, e.g. no PII, no useful keys, and should be purged. 
  - We have a nested data structure in advocacy_programs, lets unpack at least that into the main dataframe...
- Normalise to per platform for easier mapping and nicer structure?
  - At this point, migrate it out of pandas.
- ### Explore duplicated user ID's - get most recent?
  - Implement dbt snapshot sorted by the date stamp?


In [None]:
# lets unpack advocacy program into the parent.
unpacked = pd.json_normalize(df['advocacy_programs'].explode('advocacy_programs'))
newdf = pd.concat([df, unpacked], axis=1)




tasks = pd.json_normalize(newdf['tasks_completed'].explode('tasks_completed'))

- create a docker db for uploading to : postgres?
- unpack tasks completed into a seperate table with userid
- clean the data up
- implement the dbt process - connect to postgres
- sort the dockerisation process out for easy reproduction
- If it was production:
  - Airflow task
    - Read the files
    - tidy them
    - upload to postgres
    - run any dbt packages
    - create a simple export file from the database

In [None]:
# Upload the data somewhere so we can use it in dbt-core: 

from sqlalchemy import create_engine

# connect and use the local postgres instance we have running in docker.
engine = create_engine('postgresql://postgres:pass@127.0.0.1:5432/duel')

# this is safe to do in this instance, because advocacy_programs always has only 1 item in it.
newdf[['user_id', 'name', 'email', 'instagram_handle', 'tiktok_handle', 'joined_at']].astype(str).to_sql('source', engine, if_exists='replace', index=True, method='multi', schema='raw')
newdf[['program_id', 'brand', 'total_sales_attributed']].astype(str).to_sql('source__advocacy_programs', engine, if_exists='replace', index=True, method='multi', schema='raw')
tasks.astype(str).to_sql('source__tasks_completed', engine, if_exists='replace', index=True, method='multi', schema='raw')

# df.astype(str).to_sql('source', engine, if_exists='replace', index=True, method='multi', schema='raw')

newdf.head(5)

### Consider this stage done. data has been ingested and viewed at a basic level. 

The data has been expanded first in python for faster and easier json error handling and unpacking. If these json lists were of length > 1, we could handle that by duplicating the index id and having a one to many relationship. However, all lists are length 1 in this dataset.