In [1]:
!aws s3 rm s3://itversitydata/landing/ghactivity --recursive

delete: s3://itversitydata/landing/ghactivity/2022-06-02-5.json.gz
delete: s3://itversitydata/landing/ghactivity/2022-06-02-4.json.gz
delete: s3://itversitydata/landing/ghactivity/2022-06-02-0.json.gz
delete: s3://itversitydata/landing/ghactivity/2022-06-02-1.json.gz
delete: s3://itversitydata/landing/ghactivity/2022-06-02-6.json.gz


In [2]:
!aws s3 rm s3://itversitydata/raw/ghactivity --recursive

delete: s3://itversitydata/raw/ghactivity/year=2022/month=06/dayofmonth=02/part-2505d17d-e4c9-11ec-a3db-f748962304a8.snappy.parquet
delete: s3://itversitydata/raw/ghactivity/year=2022/month=06/dayofmonth=02/42eb057a-e49d-11ec-8618-acde48001122.snappy.parquet
delete: s3://itversitydata/raw/ghactivity/year=2022/month=06/dayofmonth=02/153561c0-e49d-11ec-8618-acde48001122.snappy.parquet
delete: s3://itversitydata/raw/ghactivity/year=2022/month=06/dayofmonth=02/part-f66706f7-e4c5-11ec-8df5-91c9df4b8e60.snappy.parquet


In [3]:
import boto3

In [4]:
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('jobs')
table.delete_item(Key={'job_id': 'ghactivity_ingest'})
item = {
    'job_id': 'ghactivity_ingest',
    'job_description': 'Ingest data from gmail to s3',
    'is_active': 'Y',
    'baseline_days': 3
}
table.put_item(Item=item)

{'ResponseMetadata': {'RequestId': 'TKTG0SFDVCE8GRV7Q80EJ5CPNBVV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Sun, 05 Jun 2022 13:08:14 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'TKTG0SFDVCE8GRV7Q80EJ5CPNBVV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '2745614147'},
  'RetryAttempts': 0}}

In [5]:
from datetime import datetime as dt
from datetime import timedelta as td
import time

In [6]:
def get_job_details(job_name):
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('jobs')
    job_details = table.get_item(Key={'job_id': job_name})['Item']
    return job_details

In [7]:
def get_next_file_name(job_details):
    job_start_time = int(time.mktime(dt.now().timetuple()))
    job_run_bookmark_details = job_details.get('job_run_bookmark_details')
    if job_run_bookmark_details:
        dt_part = job_run_bookmark_details['last_run_file_name'].split('.')[0].split('/')[-1]
        next_file_name = f"{dt.strftime(dt.strptime(dt_part, '%Y-%m-%d-%H') + td(hours=1), '%Y-%m-%d-%-H')}.json.gz"
    else:
        next_file_name = f'{dt.strftime(dt.now().date() - td(days=3), "%Y-%m-%d")}-0.json.gz'
    return job_start_time, next_file_name

In [8]:
import requests

In [9]:
def upload_file_to_s3(file_name):
   res = requests.get(f'https://data.gharchive.org/{file_name}')

   s3_client = boto3.client('s3')
   upload_res = s3_client.put_object(
      Bucket='itversitydata',
      Key=f'landing/ghactivity/{file_name}',
      Body=res.content
   )

   return {
      'last_run_file_name': f's3://itversitydata/landing/ghactivity/{file_name}',
      'status_code': upload_res['ResponseMetadata']['HTTPStatusCode']
   }

In [10]:
def save_job_run_details(job_details, job_run_details, job_start_time):
    dynamodb = boto3.resource('dynamodb')
    job_run_details_item = {
        'job_id': job_details['job_id'],
        'job_run_time': job_start_time,
        'job_run_bookmark_details': job_run_details,
        'create_ts': int(time.mktime(dt.now().timetuple()))
    }
    job_run_details_table = dynamodb.Table('job_run_details')
    job_run_details_table.put_item(Item=job_run_details_item)
    
    job_details_table = dynamodb.Table('jobs')
    job_details['job_run_bookmark_details'] = job_run_details
    job_details_table.put_item(Item=job_details)

In [11]:
job_details = get_job_details('ghactivity_ingest')
job_start_time, next_file = get_next_file_name(job_details)
job_run_details = upload_file_to_s3(next_file)
save_job_run_details(job_details, job_run_details, job_start_time)

In [12]:
!aws s3 ls s3://itversitydata/landing/ghactivity/

2022-06-05 18:38:42  146969588 2022-06-02-0.json.gz


In [13]:
job_details = get_job_details('ghactivity_ingest')
job_start_time, next_file = get_next_file_name(job_details)
job_run_details = upload_file_to_s3(next_file)
save_job_run_details(job_details, job_run_details, job_start_time)

In [14]:
!aws s3 ls s3://itversitydata/landing/ghactivity/

2022-06-05 18:38:42  146969588 2022-06-02-0.json.gz
2022-06-05 18:39:36   99376873 2022-06-02-1.json.gz


In [15]:
dynamodb = boto3.resource('dynamodb')
jobs_table = dynamodb.Table('jobs')
jobs_table.scan()

{'Items': [{'job_description': 'Transform Files from JSON to Parquet',
   'is_active': 'Y',
   'job_id': 'ghactivity_transform'},
  {'job_description': 'Ingest data from gmail to s3',
   'is_active': 'Y',
   'job_id': 'ghactivity_ingest',
   'baseline_days': Decimal('3'),
   'job_run_bookmark_details': {'last_run_file_name': 's3://itversitydata/landing/ghactivity/2022-06-02-1.json.gz',
    'status_code': Decimal('200')}}],
 'Count': 2,
 'ScannedCount': 2,
 'ResponseMetadata': {'RequestId': 'VL95U6UEC33P60DP3BBV3MAS4RVV4KQNSO5AEMVJF66Q9ASUAAJG',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Sun, 05 Jun 2022 13:10:20 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '452',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'VL95U6UEC33P60DP3BBV3MAS4RVV4KQNSO5AEMVJF66Q9ASUAAJG',
   'x-amz-crc32': '1115827526'},
  'RetryAttempts': 0}}

In [16]:
job_run_details_table = dynamodb.Table('job_run_details')
job_run_details_table.scan()

{'Items': [{'create_ts': Decimal('1654306829'),
   'job_id': 'ghactivity_ingest',
   'job_run_bookmark_details': {'last_run_file_name': '2022-06-01-0.json.gz',
    'status_code': Decimal('200')},
   'job_run_time': Decimal('1654306640')},
  {'create_ts': Decimal('1654307311'),
   'job_id': 'ghactivity_ingest',
   'job_run_bookmark_details': {'last_run_file_name': '2022-06-01-0.json.gz',
    'status_code': Decimal('200')},
   'job_run_time': Decimal('1654307273')},
  {'create_ts': Decimal('1654307382'),
   'job_id': 'ghactivity_ingest',
   'job_run_bookmark_details': {'last_run_file_name': '2022-06-01-1.json.gz',
    'status_code': Decimal('200')},
   'job_run_time': Decimal('1654307339')},
  {'create_ts': Decimal('1654323400'),
   'job_id': 'ghactivity_ingest',
   'job_run_bookmark_details': {'last_run_file_name': '2022-06-01-2.json.gz',
    'status_code': Decimal('200')},
   'job_run_time': Decimal('1654323356')},
  {'create_ts': Decimal('1654323809'),
   'job_id': 'ghactivity_ingest'