This notebook provides an end-to-end conversion: from the original CSVs in S3, to Pandas dataframe and then parquet locally, then uploaded into an s3 test bucket and catalogued in Glue.

Experiment:

1. Take a full day of Adobe files
2. Import them to a Pandas DataFrame with all types cast to str
3. Export them to parquet
4. Copy them to S3
5. Run the Glue crawler
6. Set all columns to string (experiment iteration 2 only)
7. Attempt to query with Athena

Something about the data:

The data used is Adobe hit. The data is put in `jp-insights-dst-adobe-sydney` bucket and partitioned in group/year/month/day.


In [None]:
# Packages

import boto3
import re
import os
import pandas as pd

In [None]:
# Pass your own POC credentials

myAccessKey = 'your_aws_access_key'
mySecretKey = 'your_aws_secret_key'


# Start session with POC
s3_client = boto3.client(
    's3',
    aws_access_key_id=myAccessKey,
    aws_secret_access_key=mySecretKey)

s3 = boto3.resource('s3')
glue = boto3.client('glue')

In [None]:
def get_buckets():
    return[bucket.name for bucket in s3.buckets.all()]

get_buckets()

In [None]:
# Download files from bucket
csv_bucket_name = 'jp-insights-dst-adobe-sydney'
prefix_path = 'group/2019/01/01'
bucket = s3.Bucket(csv_bucket_name)


In [None]:
target_files = []
for x in bucket.objects.filter(Prefix=prefix_path):
    if 'hit_data' in x.key: 
        target_files.append(x.key)
        
# or you can run this code, it's shorter        
#target_files = [x.key for x in bucket.objects.filter(Prefix=prefix_path) if 'hit_data' in x.key]

In [None]:
target_files

In [None]:
csv_bucket_name = 'jp-insights-dst-adobe-sydney'
prefix_path = 'group/2019/01/01'
bucket = s3.Bucket(csv_bucket_name)
target_files = [x.key for x in bucket.objects.filter(Prefix=prefix_path) if 'hit_data' in x.key]

for f in target_files:
    names = re.match(prefix_path + '/.*?/v1/(.*)', f)
    file_name = names.group(1)
    s3.meta.client.download_file(csv_bucket_name, f, './%s' % file_name)
    

In [None]:
# Read each downloaded CSV into a DF as all str, then write out to parquet

headers = ['col%d' % x for x in range(0, 1006)]

for f in target_files:
    names = re.match(prefix_path + '/.*?/v1/(.*)', f)
    file_name = names.group(1)
    df = pd.read_csv('./' + file_name, names=headers, dtype=str)
    print(df.head())
    df.to_parquet('./' + file_name.replace('.csv.gz', '.parquet'))
    


In [None]:
# See the dataframe in CSV
df

In [None]:
# Combine parquet file to one folder

parquet_files = [x for x in os.listdir('./') if 'parquet' in x]

In [None]:
# see the parquet file
parquet_files

In [None]:
# Let's get them into our s3 bucket for Glue

for f in parquet_files:
    print(f)
    s3.meta.client.upload_file('./' + f, 'your_parquet_file_in_S3', 'hit_data/' + f)

In [None]:
# Run the crawler to create the table
glue.start_crawler(Name='Your_crawler_name')