# Imports and setup

In [1]:
import csv
import copy
import datetime
import logging
import os
import re

import duckdb
import praw
import pandas as pd

from dotenv import load_dotenv
from pmaw import PushshiftAPI

In [2]:
load_dotenv()

True

In [3]:
reddit_client_id = os.getenv('REDDIT_CLIENT_ID')
reddit_client_secret = os.getenv('REDDIT_CLIENT_SECRET')

In [4]:
user_agent = 'DEZoomcampProject:notebook:v0.1.0 by /u/wtfzambo'

reddit = praw.Reddit(
    client_id=reddit_client_id,
    client_secret=reddit_client_secret,
    user_agent=user_agent
)

In [5]:
# Test if API client works
for submission in reddit.subreddit("test").hot(limit=10):
    print(submission.title)

test
Testing title
test repost
Secure Enterprise Browsing: Chrome adds enhanced DLP and extension protections
Secure Enterprise Browsing: Chrome adds enhanced DLP and extension protections
test
test
test
test
testdklsd


In [6]:
# Object codes for arg `fullname`

comment = 't1'
redditor = 't2'
submission = 't3'
message = 't4'
subreddit = 't5'
award = 't6'

# Using pmaw

In [7]:
# cpu_count = os.cpu_count()
# threads = cpu_count * 5

In [8]:
api = PushshiftAPI()

In [9]:
api_praw = PushshiftAPI(praw=reddit)

Trying to get my own post

In [10]:
my_post_id = '11xcy2g'
subreddit = 'dataengineering'
my_post = api_praw.search_submissions(subreddit=subreddit, ids=[my_post_id])

Not all PushShift shards are active. Query results may be incomplete.
Not all PushShift shards are active. Query results may be incomplete.


In [10]:
my_post = list(my_post)[0]

In [11]:
my_post['title']

"I don't understand DuckDB"

## Get all posts from a specific  day

In [12]:
today = datetime.date.today()
today = datetime.datetime.combine(today, datetime.time(0, 0))

one_year_ago = today - datetime.timedelta(days=365)

In [13]:
today_ts = int(today.timestamp())
one_year_ago_ts = int(one_year_ago.timestamp())

In [14]:
one_year_posts = api_praw.search_submissions(
    subreddit=subreddit,
    after=one_year_ago_ts,
    until=today_ts
)

In [15]:
one_year_posts_list = list(one_year_posts)

In [16]:
len(one_year_posts_list)

1430

In [25]:
post_ids = [post['id'] for post in one_year_posts_list]

In [28]:
with open('post_ids.csv', 'w', newline='') as f:
    csv_writer = csv.writer(f)
    for row in post_ids:
        csv_writer.writerow([row])

Extract author's info from a specific post

In [19]:
def isPrimitive(obj):
    return not hasattr(obj, '__dict__')

In [20]:
def no_empty_obj(v):
    if not isinstance(v, (list, dict)):
        return v
    return None if len(v) == 0 else v

In [21]:
def clean_up_reddit_object(d: dict):
    return {k: no_empty_obj(v) for k, v in d.items() if isPrimitive(v)}

In [30]:
pd.DataFrame.from_dict([{k: v for k, v in one_year_posts_list[0].items() if isPrimitive(v)}])

Unnamed: 0,comment_limit,comment_sort,approved_at_utc,selftext,author_fullname,saved,mod_reason_title,gilded,clicked,title,...,parent_whitelist_status,stickied,url,subreddit_subscribers,created_utc,num_crossposts,media,is_video,_fetched,_comments_by_id
0,2048,confidence,,We're currently using MS Access as storage fil...,t2_9sk2qr0q,False,,0,False,Serverless DB to replace MS Access,...,all_ads,False,https://www.reddit.com/r/dataengineering/comme...,97665,1681071000.0,0,,False,False,{}


# Test Praw `info` method

In [44]:
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)

for logger_name in ('praw', 'prawcore'):
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(handler)

In [38]:
one_year_post_ids = [f'{submission}_{post["id"]}' for post in one_year_posts_list]

In [47]:
one_year_submissions = reddit.info(fullnames=one_year_post_ids)

In [49]:
one_year_submissions_list = list(one_year_submissions)

Fetching: GET https://oauth.reddit.com/api/info/
Data: None
Params: {'id': 't3_12gu5rt,t3_12gsngo,t3_12gsifx,t3_12gqukv,t3_12gq5jz,t3_12go3xm,t3_12gnnl6,t3_12gmctv,t3_12gkvly,t3_12gjkx1,t3_12gj4wk,t3_12gilc3,t3_12ggg9o,t3_12gdiok,t3_12gdfyl,t3_12gd0ca,t3_12gbz1n,t3_12gbd91,t3_12gb2i9,t3_12ga0pb,t3_12g942e,t3_12g8i52,t3_12g8570,t3_12g851t,t3_12g69fw,t3_12g15y8,t3_12g0x63,t3_12g0546,t3_12fw7gm,t3_12fw1g0,t3_12fswm7,t3_12fp9u5,t3_12fo8lc,t3_12fnem4,t3_12fn03t,t3_12fm8md,t3_12fj2we,t3_12fdqf0,t3_12fd84i,t3_12fcrdg,t3_12fbp1b,t3_12f6old,t3_12f47e2,t3_12f2hty,t3_12ez3ca,t3_12evpjd,t3_12etz55,t3_12ergr0,t3_12erf91,t3_12erdiz,t3_12er9ut,t3_12epuyd,t3_12epsbd,t3_12ep3pl,t3_12eozr9,t3_12eoc5a,t3_12eo3z0,t3_12emkqw,t3_12emkq6,t3_12el758,t3_12ekb6i,t3_12ejxmr,t3_12ejuno,t3_12eiheo,t3_12egjxl,t3_12egefo,t3_12eg4ty,t3_12ee6bw,t3_12ecrzm,t3_12ecp7g,t3_12eaf32,t3_12e9jat,t3_12e8vxo,t3_12e6iqp,t3_12e65p3,t3_12e5kv9,t3_12e589j,t3_12e43ql,t3_12e23yj,t3_12e20s1,t3_12e0lti,t3_12dwoe3,t3_12dtv8u,t3_12dteg3,

Sleeping: 0.98 seconds prior to call
Response: 200 (57233 bytes)
Fetching: GET https://oauth.reddit.com/api/info/
Data: None
Params: {'id': 't3_x4b6fe,t3_x49tok,t3_x494kp,t3_x485jq,t3_x4674x,t3_x45lkj,t3_x45fzd,t3_x454q1,t3_x454d6,t3_x43f7b,t3_x42kb6,t3_x4110m,t3_x3yyxs,t3_x3xaeu,t3_x3xaac,t3_x3ue1m,t3_x3r3g3,t3_x3qm87,t3_x3ol47,t3_x3o0pn,t3_x3mu9c,t3_x3lh09,t3_x3kk6y,t3_x3k1x1,t3_x3jzw5,t3_x3jwru,t3_x3jusx,t3_x3jn63,t3_x3j7i5,t3_x3iei0,t3_x3i7rd,t3_x3htm7,t3_x3f9m8,t3_x3eu4v,t3_x3dn37,t3_x3d0js,t3_x3bb2b,t3_x3bb11,t3_x39lso,t3_x39jnh,t3_x39373,t3_x38xij,t3_x38qk9,t3_x3827q,t3_x37t2r,t3_x37psf,t3_x37gt2,t3_x3761u,t3_x37464,t3_x36v9z,t3_x36twq,t3_x35cp6,t3_x359gz,t3_x34dct,t3_x33x31,t3_x32ald,t3_x302iz,t3_x2zeu2,t3_x2xs61,t3_x2x61z,t3_x2vmuf,t3_x2vfm6,t3_x2tjqm,t3_x2sbay,t3_x2oydj,t3_x2om96,t3_x2ofbd,t3_x2nc4b,t3_x2mt7e,t3_x2mkcc,t3_x2m4cf,t3_x2lxiu,t3_x2lhj8,t3_x2l796,t3_x2l678,t3_x2jh91,t3_x2ioh2,t3_x2gq5f,t3_x2dyni,t3_x2am5k,t3_x2a7p5,t3_x29lkc,t3_x28fmp,t3_x25h72,t3_x23x9q,t3_x23hso

Response: 200 (13567 bytes)


In [50]:
vars(one_year_submissions_list[0])

{'comment_limit': 2048,
 'comment_sort': 'confidence',
 '_reddit': <praw.reddit.Reddit at 0x7fe7a258a170>,
 'approved_at_utc': None,
 'subreddit': Subreddit(display_name='dataengineering'),
 'selftext': "We're currently using MS Access as storage file as we do not have SERVER availability. The Access files are being saved in SharePoint and people connect there using PowerBI (free version) or Excel using PowerQuery.\n\nSo basically we're using that as a data storage.  MS Access seems really stable and hardly data gets error/corrupted as we use for read-only.\n\nThe problem is that MS Access has a DB size limitation of 2GB and we need some free open source alternative. Since we can't have any server (it's for internal team use) I would like to know what is better option:-SQLite-DuckDB\n\nWe get incremental data everyday from retailers which we save in MS Access day by day.\n\nDo you know any other self-contained database with >2B database size limit? We cannot host it or can't get any Sa

# Test Praw for comments in a big thread

In [123]:
# url = "https://www.reddit.com/r/funny/comments/3g1jfi/buttons/"
url = "https://www.reddit.com/r/dataengineering/comments/12sxihg/visualizing_parquet_in_s3_bucket_for_data_analysis/"
my_submission = reddit.submission(url=url)

my_submission_checkpoint = copy.deepcopy(my_submission)

In [124]:
my_submission_play = copy.deepcopy(my_submission_checkpoint)

In [125]:
my_submission_play.num_comments

13

In [126]:
len(my_submission_play._comments_by_id)

13

In [128]:
my_submission_play.comments.replace_more(limit=None)
comments = []
i_max = 0
for i, comment in enumerate(my_submission_play.comments.list()):
    comments.append(comment.body[:10])
    i_max = i
    
print(i_max)

12


In [56]:
comments = [vars(comment) for comment in my_submission_play.comments.list()]

How to get comment's author

In [57]:
comment = comments[2]
redditor = comment['author']

In [64]:
comment['author'].name

'veilofmaya1234'

In [60]:
comment['author_fullname']

't2_9bktr'

Replace `author` object with its actual name

In [67]:
comments_with_author_names = list(map(
    lambda comment: comment | {'author': comment['author'].name}
    if isinstance(comment['author'], praw.models.Redditor)  else comment, comments
))

# Testing duckdb

In [111]:
first_half_comments = comments_with_author_names[:362]
second_half_commments = comments_with_author_names[362:]

In [130]:
first_comments_clean = map(clean_up_reddit_object, first_half_comments)
second_comments_clean = map(clean_up_reddit_object, second_half_commments)

In [131]:
df_comments_first = pd.DataFrame.from_records(first_comments_clean)
df_comments_second = pd.DataFrame.from_records(second_comments_clean)

In [187]:
time_cols = [col for col in df_comments_first.columns if re.match('.*(creat|updat|edit).*', col)]

In [132]:
df_comments_first_head = df_comments_first.head(0)

In [133]:
duckdb.sql('CREATE TABLE IF NOT EXISTS comments AS SELECT * FROM df_comments_first_head')

In [134]:
duckdb.sql('CREATE OR REPLACE TABLE comments AS SELECT * FROM comments UNION BY NAME SELECT * FROM df_comments_first')

In [135]:
duckdb.sql('CREATE OR REPLACE TABLE comments AS SELECT * FROM comments UNION BY NAME SELECT * FROM df_comments_second')

In [195]:
recast_time_columns = ', '.join([f'{col}::int as {col}' for col in time_cols])

In [197]:
duckdb.sql(f'CREATE OR REPLACE TABLE comments AS SELECT * REPLACE ({recast_time_columns}) FROM comments')

How to get table schema for BigQuery

In [198]:
table_schema = duckdb.sql('DESCRIBE TABLE comments').df()[['column_name', 'column_type']]
table_schema.rename({'column_name': 'name', 'column_type': 'type'}, axis=1, inplace=True)

table_schema.to_dict('records')

[{'name': 'total_awards_received', 'type': 'BIGINT'},
 {'name': 'approved_at_utc', 'type': 'INTEGER'},
 {'name': 'author_is_blocked', 'type': 'BOOLEAN'},
 {'name': 'comment_type', 'type': 'INTEGER'},
 {'name': 'awarders', 'type': 'INTEGER'},
 {'name': 'mod_reason_by', 'type': 'INTEGER'},
 {'name': 'banned_by', 'type': 'INTEGER'},
 {'name': 'ups', 'type': 'BIGINT'},
 {'name': 'removal_reason', 'type': 'INTEGER'},
 {'name': 'link_id', 'type': 'VARCHAR'},
 {'name': 'author_flair_template_id', 'type': 'INTEGER'},
 {'name': 'likes', 'type': 'INTEGER'},
 {'name': 'user_reports', 'type': 'INTEGER'},
 {'name': 'saved', 'type': 'BOOLEAN'},
 {'name': 'id', 'type': 'VARCHAR'},
 {'name': 'banned_at_utc', 'type': 'INTEGER'},
 {'name': 'mod_reason_title', 'type': 'INTEGER'},
 {'name': 'gilded', 'type': 'BIGINT'},
 {'name': 'archived', 'type': 'BOOLEAN'},
 {'name': 'collapsed_reason_code', 'type': 'VARCHAR'},
 {'name': 'no_follow', 'type': 'BOOLEAN'},
 {'name': 'author', 'type': 'VARCHAR'},
 {'name':

In [199]:
duckdb.sql('SELECT created FROM comments where created is not null')

┌────────────┐
│  created   │
│   int32    │
├────────────┤
│ 1438891963 │
│ 1438902364 │
│ 1438894741 │
│ 1438893597 │
│ 1438895300 │
│ 1438910423 │
│ 1438893306 │
│ 1438901625 │
│ 1438900393 │
│ 1438892631 │
│      ·     │
│      ·     │
│      ·     │
│ 1438923640 │
│ 1438912030 │
│ 1438972553 │
│ 1438917737 │
│ 1438915752 │
│ 1438912250 │
│ 1438928979 │
│ 1438942233 │
│ 1438911945 │
│ 1438912435 │
├────────────┤
│  775 rows  │
│ (20 shown) │
└────────────┘

# Check local db

In [122]:
try:
    con.close()
except NameError:
    pass

In [116]:
con = duckdb.connect('../assets/dataengineering.duckdb')

In [120]:
con.sql('select * from submissions')

CatalogException: Catalog Error: Table with name submissions does not exist!
Did you mean "system.information_schema.columns"?