In [1]:
from pathlib import Path
import sys

sys.path.append(str(Path().absolute().parent))
# sys.path

## Ingestion

Includes:
- Requesting `playlistItems`
- Polling
- Making and executing the request
- Storing the raw information

### Requesting Playlist Items

Concerned with making the request. The next section includes functions for logging requests that the `request_playlist` function can use.

Polling should be last. It uses this and the next section as building blocks. Also transitions perfectly into transformation, as the JSON files need to be compiled.

In [5]:
import api_key
import json

# need to actually send the request to youtube. params filled out, except for pageToken. logging handled in next section
def request_playlist(youtube, maxResults=50, pageToken=""):
    '''
    Request all information on maxResults number of playlistItems from IGN game reviews playlist using youtube and a specific pageToken.    
    
    If supplied with just youtube, effectively just reads 50 items from the playlist at a time.
    '''
    # Note that in just about every case you would want maxResults to be 50
    part='contentDetails,id,snippet,status' # static
    playlistId="PLraFbwCoisJBTl0oXn8UoUam5HXWUZ7ES" # static
    
    params = {
        "part": part,
        "playlistId": playlistId,
        "maxResults": maxResults,
        "pageToken": pageToken
    }
    
    request = youtube.playlistItems().list(**params)
    request_number = generate_request_number()
    
    time_sent = get_current_time()
    response = request.execute()
    time_received = get_current_time()
    
    # log to ./db/response.csv
    md = {}
    md["requestNumber"] = request_number
    md["timeSent"] = time_sent
    md["timeReceived"] = time_received
    md["params"] = {**params}
    append_response_csv(md)
    
    # log to raw, a directory of jsons. save the conversion from json to csv in the transformation stage.
    response["requestNumber"] = request_number
    with open(f"./raw/{response['requestNumber']}.json", "w") as fp:
        json.dump(response, fp)
    
    return response

### Log responses from `request_playlist`

The following information are recorded when making and receiving a request and response:
- `requestNumber`
- `timeSent`
- `timeReceived`
- `params.part`
- `params.playlistId`
- `params.maxResults`
- `params.pageToken`

Logging also helps with handing out request numbers.

In [6]:
# need a place to log info

import pandas as pd

# init/reset response.csv. careful with this function.
def reset_response_csv(target="./db/response.csv"):
    '''
    Writes to target csv a dataframe with only columns/headers, no rows.
    Returns the dataframe used to write to the csv.
    
    Each record is a request made and received from the request_playlist() function.
    '''
    # dummy for keys, copied over from request_playlist()
    params = {
        "part": 1,
        "playlistId": 2,
        "maxResults": 3,
        "pageToken": 4
    }
    headers_df = pd.DataFrame(columns=[
        'requestNumber',
        'timeSent',
        'timeReceived',
        *[f'params.{k}' for k in params.keys()]
    ])
    
    headers_df.to_csv(target, index=False)
    return headers_df

# edit response.csv
def append_response_csv(data_dict, target="./db/response.csv"):
    df = pd.json_normalize(data_dict)
    df.to_csv(target, index=False, mode='a', header=False)
    return df

# add time requested to the received response
from datetime import datetime, timezone

# just use utc time. "central"
def get_current_time(timezone=timezone.utc):
    now = datetime.now(timezone)
    return now

def serialize_time(dt):
    return dt.isoformat()

def deserialize_datetime_string(dt_string):
    return dt_string.fromisoformat()

# add request number
def generate_request_number(source="./db/response.csv"):
    request_numbers = pd.read_csv(source)['requestNumber']
    generated = request_numbers.max() + 1
    if pd.isna(generated):
        # na + 1 = na
        generated = 0
    return generated

# md = {}
# md["requestNumber"] = 1
# md["timeSent"] = 0
# md["timeReceived"] = 2
# md["params"] = {**params}

# append_response_csv(md)

### Polling

Part ingestion, as requests will need to be requested from youtube and stored in the `raw` folder.
Part transformation, as will need to compile a list of old video ids to see if there are new videos in the playlist.

In [251]:
# need a place to store our playlist items

def reset_playlist_csv(target="./db/playlist.csv"):
    cols = ['kind',
 'etag',
 'id',
 'snippet.publishedAt',
 'snippet.channelId',
 'snippet.title',
 'snippet.description',
 'snippet.channelTitle',
 'snippet.playlistId',
 'snippet.position',
 'snippet.resourceId.kind',
 'snippet.resourceId.videoId',
 'contentDetails.videoId',
 'status.privacyStatus',
 'snippet.thumbnails.default.url',
 'snippet.thumbnails.default.width',
 'snippet.thumbnails.default.height',
 'snippet.thumbnails.medium.url',
 'snippet.thumbnails.medium.width',
 'snippet.thumbnails.medium.height',
 'snippet.thumbnails.high.url',
 'snippet.thumbnails.high.width',
 'snippet.thumbnails.high.height',
 'snippet.thumbnails.standard.url',
 'snippet.thumbnails.standard.width',
 'snippet.thumbnails.standard.height',
 'snippet.thumbnails.maxres.url',
 'snippet.thumbnails.maxres.width',
 'snippet.thumbnails.maxres.height',
 'snippet.videoOwnerChannelTitle',
 'snippet.videoOwnerChannelId',
 'contentDetails.videoPublishedAt',
 'meta.kind',
 'meta.etag',
 'meta.nextPageToken',
 'meta.pageInfo.totalResults',
 'meta.pageInfo.resultsPerPage',
 'meta.requestNumber']
    pd.DataFrame(columns=cols).to_csv(target, index=False)
    return cols

In [257]:
# just need a function that see if there's a new video in the playlist that's not in the playlist.csv file

# converting dictionary to df just makes querying so much nicer, as seen in request_video_id.isin(old_video_id)
def playlist_response_to_df(playlist_dictionary):
    '''
    Assume that playlist_dictionary is in the form of a response from youtube.playlistItems().list() with all parts.
    Also assume that each response has a responseNumber attached to it.
    
    Effectively allows structured searching rather than document search.
    '''
    df = pd.json_normalize(playlist_dictionary,
                           record_path='items',
                           meta=['kind', 'etag', 'nextPageToken', ['pageInfo', 'totalResults'], ['pageInfo', 'resultsPerPage'], 'requestNumber'],
                           meta_prefix='meta.',
                           errors='ignore'
                          )
    return df

def poll_playlist():
    '''
    Returns whether there's at least one new video in the playlist, and the response and mask used to evaluate that.
    
    If all video ids are new, request again. Otherwise, just return the new ones.
    Should work when there are no rows (besides header) in the playlist.csv.
    TODO: figure out how to consume
    '''
    # we're going to be comparing video ids
    try:
        old_video_ids = pd.read_csv('./db/playlist.csv')['snippet.resourceId.videoId']
    except:
        reset_playlist_csv()
        old_video_ids = pd.read_csv('./db/playlist.csv')['snippet.resourceId.videoId']
    
    youtube = api_key.get_youtube()
    response = request_playlist(youtube)
    requested_video_ids = playlist_response_to_df(response)['snippet.resourceId.videoId']
    videos_polled = len(requested_video_ids)
    print(len(videos_polled), end='\r')
    
    # if all the requested ones are new, then check the next page
    are_old = requested_video_ids.isin(old_video_ids)
    nextPageToken = response.get('nextPageToken')
    while (~are_old).all() and nextPageToken:
        # make another request
        response = request_playlist(youtube, pageToken=nextPageToken)
        requested_video_ids = playlist_response_to_df(response)['snippet.resourceId.videoId']
        are_old = requested_video_ids.isin(old_video_ids)
        nextPageToken = response.get('nextPageToken')
        videos_polled += len(requested_video_ids)
        print(len(videos_polled), end='\r')
    
    has_new_videos = ~(are_old.all()) # if all are old, then there's no new videos. 
    
    return has_new_videos

In [213]:
# comment below to see polling
# if ./db/playlist.csv is empty (except for header), should currently display around ~2100-2200 videos
# the same number as the number of videos in the playlist
has_new_videos = poll_playlist()
has_new_videos

50

False

## Transformation

Includes:
- Consuming
- Consolidating all the raw JSON files into one CSV
- Unnesting dicts

### About combining JSON files

Process:
1. JSON to DataFrame
2. Concat (or stack) DataFrame with previous files
3. Write DataFrame to CSV

Step 0 is to initialize the csv file with, say, the first response.

In [209]:
# helper functions for consume_raw
import json

def gather_jsons_to_df(directory='./raw'):
    '''
    Test below.
    # big_df = consolidate_json()
    # big_df.shape
    '''
    big_df = pd.DataFrame()
    to_concat = [big_df]
    
    json_files = Path(directory).glob("*.json")
    for j in json_files:
        df = json_to_df(j)
        to_concat.append(df)
        
    big_df = pd.concat(to_concat, axis=0)    
    return big_df

def json_to_df(path):
    '''
    Tests below.
    # df = json_to_df('./raw/0.json')
    # print(df.shape)
    # print(df['snippet.resourceId.videoId'][:2])
    # df.head(2)

    # paths = Path('./raw').glob("*.json")
    # json_to_df(list(paths)[0])
    '''
    with open(path, 'r') as fp:
        response_dict = json.load(fp)
        df = playlist_response_to_df(response_dict)
        
    return df

In [256]:
import shutil

def consume_raw(directory='./raw'):
    # update db playlist
    # move to consumed folder
    
    # first gather all jsons to one df, so can perform batch comparison operations in memory
    json_df = gather_jsons_to_df(directory)
    try:
        old_df = pd.read_csv("./db/playlist.csv")
    except:
        reset_playlist_csv()
        old_df = pd.read_csv("./db/playlist.csv")
    
    # choice of writing to csv: read old, combine old and new, drop duplicates, overwrite
    combined_df = pd.concat([old_df, json_df], axis=0)
    combined_df = combined_df.drop_duplicates('id')
    combined_df.to_csv("./db/playlist.csv", index=False, header=True)
    
    # TODO: add time for when the object has been consumed
    json_files = Path(directory).glob("*.json")
    for j in json_files:
        shutil.move(j, "./consumed")
    
    return list(json_files)

# would like a reset db playlist function

In [229]:
json_files = consume_raw()

In [253]:
poll_playlist()

32

True

In [238]:
len(playlist_df['id'])

2182

In [236]:
len(playlist_df['id'].unique())

2182

In [231]:
playlist_df = pd.read_csv("./db/playlist.csv")
playlist_df.groupby('meta.requestNumber').count().iloc[:, 0]

meta.requestNumber
1     50
2     50
3     50
4     50
5     50
6     50
7     50
8     50
9     50
10    50
11    50
12    50
13    50
14    50
15    50
16    50
17    50
18    50
19    50
20    50
21    50
22    50
23    50
24    50
25    50
26    50
27    50
28    50
29    50
30    50
31    50
32    50
33    50
34    50
35    50
36    50
37    50
38    50
39    50
40    50
41    50
42    50
43    32
44    50
Name: kind, dtype: int64

In [17]:
# TODO:
# - function for inting playlist
# - add time of consuming
# - 

# write the first one in so headers are in place
# df = convert_json_to_df(test_json)
# df.to_csv("./db/playlist.csv", index=False)
# TODO:
# function for clearing rows, except headers

#### Tests

##### Test:
- for new column values, only allowed to append them to the first row of the csv file
- algorithm for finding which header to append and putting them in the back

In [125]:
df1 = pd.DataFrame({
    "animal": ["cow", "parrot"],
    "name": ["moomoo", "polly"]
})
df1.to_csv("test.csv", index=False)
df1

Unnamed: 0,animal,name
0,cow,moomoo
1,parrot,polly


In [126]:
df2 = pd.DataFrame({
    "animal": ["scorpion"],
    "name": ["gliscor"],
    "legs": [6]
})
df2

Unnamed: 0,animal,name,legs
0,scorpion,gliscor,6


In [139]:
read_df = pd.read_csv("test.csv")
read_df.head(0)

Unnamed: 0,animal,name


In [138]:
old_columns = pd.Series(read_df.columns)
new_columns = pd.Series(df2.columns)
cols_to_append_mask = ~(new_columns.isin(old_columns)) # append the columns that don't appear in the previous df
cols_to_append = new_columns[cols_to_append_mask]
cols_to_append

2    legs
dtype: object

In [144]:
# then need to write to the first row of the csv file
import csv

with open('test.csv', newline='') as fp:
    r = csv.reader(fp)
    row0 = next(r)
    

In [159]:
row0.append('llama')

# parse if snippet.resourceId.videoId are new
#     videoId_key = 'snippet.resourceId.videoId'
    
#     received_video_ids = big_df[videoId_key]
#     old_video_ids = old_df[videoId_key]
#     mask = ~(received_video_ids.isin(old_video_ids))
    
    # another idea is just dropping duplicates, rather than avoiding them
    # for now, concat playlist.csv 
    # to_concat = big_df[mask]
    
    # read each file, convert to df, parse if it's actually new, write to csv

Works, but just going to read the csv in with pandas, modify it, and finally copy to csv.

##### Test:
- using the `columns` keyword argument in `to_csv` so that can add new columns while keeping old ones

In [120]:
df1 = pd.DataFrame({
    "animal": ["cow", "parrot"],
    "name": ["moomoo", "polly"]
})
df1.to_csv("test.csv", index=False)
df1

Unnamed: 0,animal,name
0,cow,moomoo
1,parrot,polly


In [121]:
df2 = pd.DataFrame({
    "animal": ["scorpion"],
    "name": ["gliscor"],
    "legs": [6]
})
df2

Unnamed: 0,animal,name,legs
0,scorpion,gliscor,6


In [122]:
read_df = pd.read_csv("test.csv")
read_df.head(0)

Unnamed: 0,animal,name


In [123]:
# really just need a way to get the right column order and call columns=...

# first approach
# just concat an empty df that has the old columns with the new df
to_concat = [
    read_df.head(0),
    df2.head(0)
]
cat_df = pd.concat(to_concat)
print(list(cat_df.columns))
cat_df

['animal', 'name', 'legs']


Unnamed: 0,animal,name,legs


In [119]:
# df1.to_csv('test.csv', mode='a', header=False, index=False, columns=list(cat_df.columns))

In [124]:
# see if appending to csv with the concatenated df will add the new columns
df2.to_csv('test.csv', columns=list(cat_df.columns), mode='a', header=False, index=False)

Does not work. Still squeezes the extra value into the last column.

##### Test:
- CSV file has 2 headers
- Append to that CSV a dataframe with 3 headers

Does the CSV "properly" update?

In [88]:
df1 = pd.DataFrame({
    "animal": ["cow", "parrot"],
    "name": ["moomoo", "polly"]
})
df1.to_csv("test.csv", index=False)
df1

Unnamed: 0,animal,name
0,cow,moomoo
1,parrot,polly


In [89]:
df2 = pd.DataFrame({
    "animal": ["scorpion"],
    "name": ["gliscor"],
    "legs": [8]
})
df2

Unnamed: 0,animal,name,legs
0,scorpion,gliscor,8


In [90]:
df2.to_csv("test.csv", mode="a", index=False, header=True) # maybe it's the header argument?

Doesn't seem like there's a way to directly add a column to a CSV file. You would have to read the CSV file as a data frame, concatenate it with the new one, then write to CSV (not append). Surprised that a method does not exist to do that efficiently.

For now, if there's a new column, go in write mode. Otherwise, append.headers

##### Test: Read CSV with less headers/columns than in-memory data frames

In [31]:
df1 = pd.read_csv("test.csv")
df1

Unnamed: 0,animal,name
0,cow,moo
1,parrot,polly


In [32]:
df2 = df1.copy()
df2['legs'] = [4, 2]
df2

Unnamed: 0,animal,name,legs
0,cow,moo,4
1,parrot,polly,2


In [35]:
df3 = pd.concat([df1, df2], ignore_index=True)
df3

Unnamed: 0,animal,name,legs
0,cow,moo,
1,parrot,polly,
2,cow,moo,4.0
3,parrot,polly,2.0


In [36]:
df3.to_csv("test.csv", index=False)

In [37]:
df4 = pd.read_csv("test.csv")
df4

Unnamed: 0,animal,name,legs
0,cow,moo,
1,parrot,polly,
2,cow,moo,4.0
3,parrot,polly,2.0
