In [6]:
import json
import boto3
import requests
import pandas as pd
from datetime import datetime

## Get files from the bucket
**NOTE:** No steps will work here unless you've set the AWS credentials in the poetry shell: 
- `export AWS_PROFILE=<your profile name>`

In [9]:
s3 = boto3.resource('s3')
bucket = s3.Bucket('test-event-storage-bucket')

In [10]:
# Load all the file keys (each file is a batched output from the kinesis firehose)
file_keys = [obj.key for obj in bucket.objects.filter(Prefix='events/')]
# Print the first 5 in the list
file_keys[:5]

['events/2020/12/04/12/my-pipeline-firehose-1-2020-12-04-12-59-41-03d12bbb-ef41-49db-a220-ae7deee1d0e8',
 'events/2020/12/04/13/my-pipeline-firehose-2-2020-12-04-13-20-39-3078e44b-64aa-4b41-b468-9d1fef64dfdf',
 'events/2020/12/04/13/my-pipeline-firehose-2-2020-12-04-13-22-04-6c149022-3cf7-4e0b-aa6a-eeec48abcb35']

In [14]:
# Read a selected file into memory

# Create the file as an object
#file = s3.Object(bucket.name, file_keys[0])
file = s3.Object(bucket.name, file_keys[1])
# Read (download) the object
json_line = file.get()['Body'].read().decode().strip()
# In the pipeline .put_record() method we added a \n so that we now can split on it to seperate the events
json_list = [json.loads(x) for x in json_line.split('\n')]
# Print the first two events
json_list[:2]

[{'geo_country': 'SE',
  'device': 'Desktop',
  'user_agent': 'python-requests/2.24.0',
  'event_data': {'event': 'hej', 'category': 'test', 'example_var': 123},
  'utc_timestamp': 1607088039.2297587},
 {'geo_country': 'SE',
  'device': 'Desktop',
  'user_agent': 'python-requests/2.24.0',
  'event_data': {'event': 'hej',
   'category': 'test',
   'example_var': 123,
   'ip': 1234567890},
  'utc_timestamp': 1607088082.3295436}]

In [15]:
# Turn the Jsons into a dataframe so we can work with them
df = pd.json_normalize(json_list, sep='_')
# Convert timestamp to humanly readable format
df.loc[:, 'utc_timestamp'] = pd.to_datetime(df.utc_timestamp, unit='s')

In [16]:
df

Unnamed: 0,geo_country,device,user_agent,utc_timestamp,event_data_event,event_data_category,event_data_example_var,event_data_ip
0,SE,Desktop,python-requests/2.24.0,2020-12-04 13:20:39.229758739,hej,test,123,
1,SE,Desktop,python-requests/2.24.0,2020-12-04 13:21:22.329543591,hej,test,123,1234568000.0


## Working with dates
Even better is to work with dates so you don't load everything at once.

In [17]:
available_dates = set([datetime.strptime('-'.join(f.split('/')[1:4]), '%Y-%m-%d').date() for f in file_keys])
print('Available dates:', available_dates)

Available dates: {datetime.date(2020, 12, 4)}


In [18]:
# Function to load specific file based on path
def load_file(file_path):
    file = s3.Object(bucket.name, file_path)
    return file.get()['Body'].read().decode().strip()

# Function to load everything existing under a specific date folder
def load_date(date_str: str) -> list:
    date = datetime.strptime(date_str, '%Y-%m-%d').date()
    date_path = '/'.join(str(date).split('-'))
    filter_str = f'events/{date_path}/'
    file_keys = [obj.key for obj in bucket.objects.filter(Prefix=filter_str)]
    json_list = []
    for f in file_keys:
        json_list.extend([json.loads(x) for x in load_file(f).split('\n')])
    return json_list

In [22]:
# Load specific date (string converted into date object in function)
jl = load_date('2020-12-04')

In [23]:
df = pd.json_normalize(jl, sep='_')

In [24]:
df

Unnamed: 0,geo_country,device,user_agent,utc_timestamp,event_data_event,event_data_category,event_data_example_var,event_data_ip
0,SE,Desktop,python-requests/2.24.0,1607087000.0,hej,test,123,
1,SE,Desktop,python-requests/2.24.0,1607088000.0,hej,test,123,
2,SE,Desktop,python-requests/2.24.0,1607088000.0,hej,test,123,1234568000.0
3,SE,Desktop,python-requests/2.24.0,1607088000.0,hej,test,123,1234568000.0
