In [1]:
import pandas as pd
import numpy as np
import json
import datetime
from tqdm import tqdm
from aws_s3 import session, s3_client
import dateutil.parser
import psycopg2
from sqlalchemy import create_engine
dbname = 'r1db'
import logging
import sys

stdout_handler = logging.StreamHandler(sys.stdout)
handlers = [stdout_handler]

logging.basicConfig(
    level=logging.INFO, 
    format='[%(asctime)s] %(levelname)s - %(message)s',
    handlers=handlers
)

logger = logging.getLogger('LOGGER_NAME')

# logger.info('Info %s', 'rally_clicks_loader_logging')

In [4]:
class ClickStore:
    click_ct = 0
    last_dt = datetime.datetime.utcnow()

    def __init__(self, click_ct = 0, last_dt = datetime.datetime.utcnow()):
        self.click_ct = click_ct
        self.last_dt = last_dt

    def __str__(self):
        return 'Click Count: {click_ct} \tDate of Last Click: {last_dt}'\
            .format(click_ct = self.click_ct,
                    last_dt = self.last_dt)

In [7]:
def _aws_key_time(k):
    '''
    Given the AWS key name, find the time embedded in it
    This function is, obviously, specific to the naming conventions for Rebrandly data being dropped into AWS
    '''
    k2 = k[40:][:19]
    return datetime.datetime.strptime(k2, "%Y-%m-%d-%H-%M-%S")


def fetch_clicks(self):
    '''
    fetch all clicks after a given date
    this method is supposed to be called every minute
    '''
    loaded_clicks = 0
    paginator = s3_client.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket='rally-link',
                                       PaginationConfig={'MaxItems': 512})

    last_dt = self.last_dt
    logger.info('starting click loads: %s', self)
    prefix_last = self.last_dt.strftime("clicks/%Y/%m/%d/%H/")
    prefix_now  = datetime.datetime.utcnow().strftime("clicks/%Y/%m/%d/%H/")

    tup1 = tuple(list(map(int,filter(len,prefix_last.split('/')[1:]))))
    tup2 = tuple(list(map(int,filter(len,prefix_now.split('/')[1:]))))
    hrs = int((datetime.datetime(*list(tup2)) - datetime.datetime(*list(tup1))).total_seconds() // 3600) + 1

    date_tuples = []

    dt1 = datetime.datetime(*list(tup1))
    for hr in range(0, hrs):
        dt = dt1 + datetime.timedelta(hours = hr)
        date_tuples.append(dt)

    if hr < hrs:
        dt = dt1 + datetime.timedelta(hours = hrs)
        date_tuples.append(dt)

    # logger.info('Loading clicks from %s to %s: %d tuples', tup1, tup2, len(date_tuples))
    for dt in date_tuples:
        prefix_common = 'clicks/' + '/'.join([format(dt.year, '04d'),
                                              format(dt.month, '02d'),
                                              format(dt.day, '02d'),
                                              format(dt.hour, '02d')])
        operation_parameters = {'Bucket': 'rally-link',
                                'Prefix': prefix_common}
        # logger.info('ops params %s', operation_parameters)

        page_iterator = paginator.paginate(**operation_parameters)

        for page in page_iterator:
            # logger.info('iterating page: %s', page)
            page_key_count = page['KeyCount'] if 'KeyCount' in page.keys() else 0
            if not page_key_count:
                # logger.info('page key count %d', page_key_count)
                continue
            content_list = page['Contents']

            for content_item in content_list:
                aws_filename = content_item['Key']
                aws_time = _aws_key_time(aws_filename)
                is_new = (str(aws_time)[:19]) > (str(last_dt)[:19])

                if not is_new:
                    continue
                # logger.info('aws_filename %s, time %s, last_loaded, %s, is_new %s', aws_filename, aws_time, str(last_dt)[:19], is_new)
                click_obj = s3_client.get_object(Bucket='rally-link', Key=aws_filename)
                click_bytes = click_obj['Body'].read()
                click_strings = str(click_bytes, 'utf-8').split()
                logger.info('found %d click_strings', len(click_strings))

                for click_string in click_strings:
                    # logger.info('Click String %s',click_string)
                    click_data = json.loads(click_string)
                    click_data['aws_file'] = aws_filename
                    last_click = Click.objects.create_click(**click_data)
                    # logger.info('Created click %s\n aws_filename %s', last_click, aws_filename)
                    loaded_clicks += 1
                self.last_dt = last_click.timestamp
                self.click_ct = Click.objects.count()
                self.last_dt = self.last_dt
                self.save(update_fields=['click_ct', 'last_dt'])
                # logger.info('finishing content: %s, %s, %s', aws_filename, self.last_dt, self.click_ct)
                # break
            try:
                self.last_dt = last_click.timestamp
                self.click_ct = Click.objects.count()
                self.save(update_fields=['click_ct', 'last_dt'])
                if (loaded_clicks):
                    logger.info('Found and loaded %d more clicks, count now is %d', loaded_clicks, self.click_ct)

                # logger.info('finishing page: %s', page)
            except:
                pass
            # break
        # logger.info('done date tuple %s', dt)
    if loaded_clicks:
        logger.info('Loaded %d additional clicks', loaded_clicks)

In [8]:
cs = ClickStore()
cs.fetch_clicks = fetch_clicks.__get__(cs, ClickStore)
print(cs)

Click Count: 0 	Date of Last Click: 2021-01-30 00:31:20.923877


In [9]:
cs.fetch_clicks()

[2021-01-29 20:01:30,715] DEBUG - Loading JSON file: /home/singhj/anaconda3/envs/comp119/lib/python3.7/site-packages/botocore/data/s3/2006-03-01/paginators-1.json
[2021-01-29 20:01:30,717] INFO - starting click loads: Click Count: 0 	Date of Last Click: 2021-01-30 00:31:20.923877
[2021-01-29 20:01:30,719] DEBUG - Event before-parameter-build.s3.ListObjectsV2: calling handler <function set_list_objects_encoding_type_url at 0x7fc4d6ee73b0>
[2021-01-29 20:01:30,720] DEBUG - Event before-parameter-build.s3.ListObjectsV2: calling handler <function validate_bucket_name at 0x7fc4d6ee60e0>
[2021-01-29 20:01:30,721] DEBUG - Event before-parameter-build.s3.ListObjectsV2: calling handler <bound method S3RegionRedirector.redirect_from_cache of <botocore.utils.S3RegionRedirector object at 0x7fc4d6b10f50>>
[2021-01-29 20:01:30,722] DEBUG - Event before-parameter-build.s3.ListObjectsV2: calling handler <bound method S3ArnParamHandler.handle_arn of <botocore.utils.S3ArnParamHandler object at 0x7fc4d6a

[2021-01-29 20:01:31,318] DEBUG - Event before-parameter-build.s3.GetObject: calling handler <function generate_idempotent_uuid at 0x7fc4d6eddef0>
[2021-01-29 20:01:31,323] DEBUG - Event before-call.s3.GetObject: calling handler <function add_expect_header at 0x7fc4d6ee6440>
[2021-01-29 20:01:31,331] DEBUG - Event before-call.s3.GetObject: calling handler <bound method S3RegionRedirector.set_request_url of <botocore.utils.S3RegionRedirector object at 0x7fc4d6b10f50>>
[2021-01-29 20:01:31,333] DEBUG - Event before-call.s3.GetObject: calling handler <function inject_api_version_header_if_needed at 0x7fc4d6ee77a0>
[2021-01-29 20:01:31,334] DEBUG - Making request for OperationModel(name=GetObject) with params: {'url_path': '/rally-link/clicks/2021/01/30/00/rb-clicks-stream-1-2021-01-30-00-38-16-8890d40e-5384-45d6-a38b-2378343e0a43', 'query_string': {}, 'method': 'GET', 'headers': {'User-Agent': 'Boto3/1.16.56 Python/3.7.4 Linux/5.4.0-65-generic Botocore/1.19.56'}, 'body': b'', 'url': 'http

NameError: name 'Click' is not defined

In [2]:
class Click:
    def create_click_dict(data_in):
    had_errors = False
    try:
        # The bare minimum attributes we need
        location = data_in['client']['location']
        agent = data_in['client']['agent']
        route = data_in['route']
        init_dict = {
            'timestamp': timestamp,
            'external_id': route['id'],
        }
        sess = data_in['client']['session']
        if 'first' in sess:
            init_dict['unique'] = 'T'
    except:
        return None

    try:
        init_dict["client_country"] = data_in["client"]["location"]["country"][:32]
    except: 
        init_dict["client_country"] = ''
    try:
        init_dict["client_region"] = data_in["client"]["location"]["region"][:32]
    except:
        init_dict["client_region"] = ''
    try:
        init_dict["client_city"] = data_in["client"]["location"]["city"][:32]
    except:
        init_dict["client_city"] = ''
    try:
        init_dict["agent_type"] = data_in["client"]["agent"]["type"][:8]
    except:
        init_dict["agent_type"] = ''
        had_errors = True
    try:
        init_dict["agent_browser"] = data_in["client"]["agent"]["browser"]["name"][:16]
    except:
        init_dict["agent_browser"] = ''
        had_errors = True
    try:
        init_dict["agent_os_version"] = data_in["client"]["agent"]["os"]["version"][:16]
    except:
        init_dict["agent_os_version"] = ''
        had_errors = True
    try:
        init_dict["agent_device"] = data_in["client"]["agent"]["device"]["name"][:16]
    except:
        init_dict["agent_device"] = ''
        had_errors = True
    
    return init_dict

In [None]:
fetch_clicks()

In [None]:
import logging
import sys

stdout_handler = logging.StreamHandler(sys.stdout)
handlers = [stdout_handler]

logging.basicConfig(
    level=logging.DEBUG, 
    format='[%(asctime)s] %(levelname)s - %(message)s',
    handlers=handlers
)

logger = logging.getLogger('LOGGER_NAME')

logger.info('Info %s', 'rally_clicks_loader_logging')

In [None]:
start_dt = datetime.datetime(2021,1,15)
cum_df = fetch_clicks(secrets, start_dt)
cum_df

In [None]:
engine = create_engine('postgresql+psycopg2://djangouser:Ra11y11aR@turtles.cc8pxmbcbosi.us-east-1.rds.amazonaws.com:5432/jsdb')
cum_df.to_sql('clicks_few_jan25', engine, index=False)

# resultDf.to_sql('table_name', engine, schema="schema_name", if_exists="append", index=False)

In [None]:
cum_df.to_csv('/tmp/foobar.csv')

In [None]:
import pytz
a = '2020-11-01T06:55:10.886334Z'
def gt(dt_str):
    dt, _, us = dt_str.partition(".")
    dt = datetime.datetime.strptime(dt, "%Y-%m-%dT%H:%M:%S")
    us = int(us.rstrip("Z"), 10)
    return dt + datetime.timedelta(microseconds=us)
def gmt(dt_str):
    return gt(dt_str).replace(tzinfo=pytz.UTC)
print (gmt(a))

In [None]:
# !pip install psycopg2-binary

In [None]:
import datetime
tstamp = '2021-01-24T04:01:30.713573Z'
datetime.datetime.fromisoformat(tstamp.replace('Z', '+00:00'))

In [None]:
last_clicks = 'clicks/2020/12/26/05/'
time_clicks = 'clicks/2021/01/27/12/'
tup1 = tuple(list(map(int,filter(len,last_clicks.split('/')[1:]))))
tup2 = tuple(list(map(int,filter(len,time_clicks.split('/')[1:]))))
hrs = int((datetime.datetime(*list(tup2)) - datetime.datetime(*list(tup1))).total_seconds() // 3600)
hrs

In [None]:
for hr in range(0, hrs):
    print (datetime.datetime(*list(tup1)) + datetime.timedelta(hours = hr))
if hr < hrs:
    print (datetime.datetime(*list(tup1)) + datetime.timedelta(hours = hrs))

In [None]:
bucket_name = 'rally-link'
folder_name = 'clicks'
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)

def find_files_yrmo(bucket, folder_name, yr, mo):
    files = []
    deep_folder_name = folder_name+('/%4d/%02d'%(yr,mo))
    print('deep folder',deep_folder_name)
    for aws_file in bucket.objects.filter(Prefix = deep_folder_name):
        # print(aws_file)
        file_date = aws_file.last_modified.replace(tzinfo = None)
        strt_date = datetime.datetime(yr,mo,1,tzinfo = None)
        if mo == 12:
            (end_date_yr, end_date_mo) = (yr+1, 1)
        else:
            (end_date_yr, end_date_mo) = (yr, mo+1)
        last_date = datetime.datetime(end_date_yr, end_date_mo, 1)
        if (file_date >= strt_date) and (file_date < last_date):
            files.append(aws_file.key)
            if len(files) > 10:
                return files
        if file_date >= last_date:
            break
    return files

files = find_files_yrmo(bucket, folder_name, 2020, 11)
#logger.info('Loading %d files', len(files))

for filename in files:
    click_obj = s3_client.get_object(Bucket=bucket_name, Key=filename)
    click_bytes = click_obj['Body'].read()
    click_strings = str(click_bytes, 'utf-8').split()
    for click_string in click_strings:
        click_data = json.loads(click_string)
        # print(click_data)


#now = datetime.datetime.utcnow()
#last_year = datetime.datetime(now.year -1, 1, 1)
#print (last_year)
#files = find_files_yr(datetime.datetime(now.year, 1, 1))
#len(files)
#now.year
