# Notebook pour réaliser l'extraction des posts et commentaires
### Adapté pour être facilement utilisable sur Jupyter dans GCP

In [2]:
import re
import requests
import os
import pandas as pd
from datetime import datetime
import sqlite3
from google.cloud import storage
import numpy as np
import mysql.connector
import sys
import csv
import time
from google.cloud import bigquery
from pyspark.sql import SparkSession

In [3]:
client = bigquery.Client()

## Importer un fichier depuis le bucket

In [4]:
def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your GCS object
    # source_blob_name = "storage-object-name"

    # The path to which the file should be downloaded
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

## Fonction d'authentification  à l'API reddit

In [5]:
def authenticate_API():
    """
    Function to authenticate to the Reddit API
    Returns the headers we'll need for future API queries.
    """
    # Open the file containing the login infos, and store them for later on
    download_blob('the-clean-project','notebooks/jupyter/authentication_file.txt','auth_file.txt')
    
    with open('auth_file.txt', 'r') as f: 
        client_id, secret_token, grant_type, username, password, user_agent = f.readline().split(",")
        
    # note that CLIENT_ID refers to 'personal use script' and SECRET_TOKEN to 'token'
    auth = requests.auth.HTTPBasicAuth(client_id, secret_token)

    # here we pass our login method (password), username, and password
    data = {'grant_type': grant_type,
            'username': username,
            'password': password}

    # setup our header info, which gives reddit a brief description of our app
    headers = {'User-Agent': user_agent}

    # send our request for an OAuth token
    res = requests.post('https://www.reddit.com/api/v1/access_token',
                        auth=auth, data=data, headers=headers)

    # convert response to JSON and pull access_token value
    TOKEN = res.json()['access_token']

    # add authorization to our headers dictionary
    headers = {**headers, **{'Authorization': f"bearer {TOKEN}"}}
    return headers


## Le code d'extraction des posts

In [6]:
def df_from_response_for_posts(res, date_extraction):
    """
    We use this function to convert responses to dataframes, in the context of POSTS extraction.
    In this dataframe, we extract metrics from the json file given as a response
    
    Arg : res, which is the query we make to the Reddit API (a GET query).
    Returns : a dataframe containing the info for each extracted post, that is (for now):
    - subreddit: subreddit name;
    - title: post title;
    - selftext: post body;
    - author_fullname: id of the post author (t2_'xxxxx');
    - upvote_ratio; 
    - created_utc: publication time;
    - num_comments;
    - id: post ID, which is part of each post URL btw;
    - kind: type prefix, for posts of a subreddit it is "t3", "t1" for a comment...
    """
    # initialize temp dataframe for batch of data in response
    df = pd.DataFrame()
    
    # loop through each post pulled from res and append to df
    for post in res.json()['data']['children']:
        author = None
        if 'author_fullname' in post['data']:
            author = post['data']['author_fullname']
        df_new = pd.DataFrame({
            'id_post': post['data']['id'],
            'id_subreddit': post['data']['subreddit_id'],
            'subreddit': post['data']['subreddit'],
            'id_author': author,
            'author': post['data']['author'],
            'num_comments': post['data']['num_comments'],
            'subreddit_subscribers': post['data']['subreddit_subscribers'],
            'upvote_ratio': post['data']['upvote_ratio'],
            'ups': post['data']['ups'],
            'downs': post['data']['downs'],
            'score': post['data']['score'],
            'created_utc': datetime.fromtimestamp(post['data']['created_utc']).strftime('%Y-%m-%d %H:%M:%S'),
            'extraction_utc': date_extraction,
            'kind': post['kind'],
        }, index=[1])

        df = pd.concat([df, df_new], ignore_index=True)
    return df


def posts_extraction(subreddit:str="r/worldnews"):
    """
    Function to extract the first posts from a subreddit page.
    
    Args :
    - subreddit: name of the subreddit under format "r/name", by default "r/worldnews";
    - n_posts: int, number of posts we want to extract from the subreddit, by default 100.
        A choice we make here is to automatically round this number to the upper hundred in our code.
    Creates : a csv file containing the info about the extracted posts, that is:
    -
    -
    -

    TODO: try-except, error prevention 
    """

    # We first authenticate to the API
    headers = authenticate_API()

    # initialize dataframe and parameters for pulling data in loop 
    data = pd.DataFrame()
    params = {'limit': 100}

    # Create a flag for scanning the subreddit as long as there is a post to fetch
    flag = True

    # At each loop, we extract 100 posts with their info
    while flag:
        # make request
        res = requests.get(f"https://oauth.reddit.com/r/{subreddit}",
                        headers=headers,
                        params=params)

        # get dataframe from response
        new_df = df_from_response_for_posts(res, date_extraction)
        # take the final row (oldest entry)
        row = new_df.iloc[len(new_df)-1]
        # create fullname
        fullname = row['kind'] + '_' + row['id_post']
        # add/update fullname in params
        params['after'] = fullname

        # append new_df to data
        data = pd.concat([data, new_df], ignore_index=True)

        # Flag set to True if len(new_df)>=100, False otherwise
        flag = (len(new_df)>=100)
    
    dataset_id = 'dwh'
    # For this sample, the table must already exist and have a defined schema
    table_id = 'post'
    table_ref = client.dataset(dataset_id).table(table_id)
    table = client.get_table(table_ref)
    # Creating a list of tuples with the values that shall be inserted into the table
    client.insert_rows(table, data.values.tolist())


## Le code d'extraction des commentaires

In [8]:
listeChildren = []

def df_replies(resjson, df_comments, id_post, rang, headers):
    """
    Permet d'extraire les réponses aux commentaires contenus dans le json resjson

    Requête : comments

    Params :
        - resjson : json
        - df_comments : dataframe
        - id_post : str
        - rang : int
    
    Return : le dataframe contenant les informations des réponses aux commentaires
    """
    
    global listeChildren
    # add content of the comment in the dataframe
    author = None
    if 'author_fullname' in resjson['data']:
        author = resjson['data']['author_fullname']
    if "body" in resjson['data']:
        df_add = pd.DataFrame({
            'id_comment': resjson['data']['id'],
            'id_post': id_post,
            'id_author': author,
            'author': resjson['data']['author'],
            'content': resjson['data']['body'],
            'type_content': 'Comment',
            'ups': resjson['data']['ups'],
            'downs': resjson['data']['downs'],
            'score': resjson['data']['score'],
            'created_utc': datetime.fromtimestamp(resjson['data']['created_utc']).strftime('%Y-%m-%d %H:%M:%S'),
            'extraction_utc': date_extraction,
        }, index=[1])

        df_comments = pd.concat([df_comments, df_add], ignore_index=True)
    else:
        listeChildren += resjson['data']['children']
#         df_comments = children_extraction(df_comments, resjson['data']['id'],resjson['data']['children'], rang + 1, headers, id_post)

    if "replies" in resjson['data']:
        # recursive to add replies
        if resjson['data']['replies'] != "":
            for post in resjson['data']['replies']['data']['children']:
                df_comments = df_replies(post, df_comments, id_post, rang + 1, headers)
    else:
        listeChildren += resjson['data']['children']
#         df_comments = children_extraction(df_comments, resjson['data']['id'],resjson['data']['children'], rang + 1, headers, id_post)

    return df_comments

def df_from_response_for_comments(res, headers, id_post):
    """
    Permet d'extraire les commentaires contenus dans le json res

    Requête : comments

    Params :
        - res : json

    Return : le dataframe contenant les informations des commentaires
    """
    # initialize temp dataframe for batch of data in response
    df_comments = pd.DataFrame()
    
    # add_type title of the post
    author = None
    if 'author_fullname' in res.json()[0]['data']['children'][0]['data']:
        author = res.json()[0]['data']['children'][0]['data']['author_fullname']
    df_add = pd.DataFrame({
        'id_comment': res.json()[0]['data']['children'][0]['data']['id'],
        'id_post': id_post,
        'id_author': author,
        'author': res.json()[0]['data']['children'][0]['data']['author'],
        'content': res.json()[0]['data']['children'][0]['data']['title'],
        'type_content': 'Title',
        'ups': res.json()[0]['data']['children'][0]['data']['ups'],
        'downs': res.json()[0]['data']['children'][0]['data']['downs'],
        'score': res.json()[0]['data']['children'][0]['data']['score'],
        'created_utc': datetime.fromtimestamp(res.json()[0]['data']['children'][0]['data']['created_utc']).strftime('%Y-%m-%d %H:%M:%S'),
        'extraction_utc': date_extraction,
    }, index = [1])

    df_comments = pd.concat([df_comments, df_add], ignore_index=True)

    # add_type content of the post
    author = None
    if 'author_fullname' in res.json()[0]['data']['children'][0]['data']:
        author = res.json()[0]['data']['children'][0]['data']['author_fullname']
    df_add = pd.DataFrame({
        'id_comment': res.json()[0]['data']['children'][0]['data']['id'],
        'id_post': id_post,
        'id_author': author,
        'author': res.json()[0]['data']['children'][0]['data']['author'],
        'content': res.json()[0]['data']['children'][0]['data']['selftext'],
        'type_content': 'Post',
        'ups': res.json()[0]['data']['children'][0]['data']['ups'],
        'downs': res.json()[0]['data']['children'][0]['data']['downs'],
        'score': res.json()[0]['data']['children'][0]['data']['score'],
        'created_utc': datetime.fromtimestamp(res.json()[0]['data']['children'][0]['data']['created_utc']).strftime('%Y-%m-%d %H:%M:%S'),
        'extraction_utc': date_extraction,
    }, index = [1])

    df_comments = pd.concat([df_comments, df_add], ignore_index=True)

    # loop to add content of the comments
    for post in res.json()[1]['data']['children']:
        df_comments = df_replies(post, df_comments, id_post, 1, headers)
    return df_comments

def df_from_response_for_children(res, id_post, rang, headers):
    """
    Permet d'extraire les commentaires contenus dans le json res

    Requête : morechildren

    Params :
        - res : json
        - id_post : str
        - rang : int

    Return : le dataframe contenant les informations des commentaires
    """
    
    global listeChildren
    # initialize temp dataframe for batch of data in response
    df_children = pd.DataFrame()
    
    if 'json' in res.json():
        if len(res.json()['json']['data']['things']) != 0:
            for post in res.json()['json']['data']['things']:
                author = None
                if 'author_fullname' in post['data']:
                    author = post['data']['author_fullname']
                if 'body' in post['data']:
                    df_add = pd.DataFrame({
                        'id_comment': post['data']['id'],
                        'id_post': id_post,
                        'id_author': author,
                        'author': post['data']['author'],
                        'content': post['data']['body'],
                        'type_content': 'Comment',
                        'ups': post['data']['ups'],
                        'downs': post['data']['downs'],
                        'score': post['data']['score'],
                        'created_utc': datetime.fromtimestamp(post['data']['created_utc']).strftime('%Y-%m-%d %H:%M:%S'),
                        'extraction_utc': date_extraction,
                    }, index=[1])

                    df_children = pd.concat([df_children, df_add], ignore_index=True)
#                 else:
#                     listeChildren += post['data']['children']
#                     df_children = children_extraction(df_children, post['data']['id'], rang + 1, headers, id_post)

    return df_children

def children_extraction(df_comments, link_id, rang, headers, id_post):
    """
    Permet d'extraire les commentaires non accessibles avec la première requête
    Utilisation de l'API avec la requêtre 'more children'

    Params :
        - df_comments : dataframe
        - link_id : str
        - children : list
        - rang : int

    Return : le dataframe avec les nouveaux commentaires
    """
    children = ""
    i = 0
    global listeChildren

    while len(listeChildren) > 0:
        if len(listeChildren) > 100:
            children = listeChildren[0]
            for i in range(1, 100):
                children += ", "
                children += listeChildren[i]
            del(listeChildren[0:100])
        else:
            children = listeChildren[0]
            for i in range(1, len(listeChildren)):
                children += ", "
                children += listeChildren[i]
            listeChildren.clear()

        params = {'link_id': "t3_" + id_post, 'children': children, 'api_type': "json"}

        # make request
        res = requests.get("http://oauth.reddit.com/api/morechildren",
                            headers=headers, params=params)

        # get dataframe from response
        new_df = df_from_response_for_children(res, id_post, rang, headers)

        # append new_df to data
        df_comments = pd.concat([df_comments, new_df], ignore_index=True)

    return df_comments

def comments_extraction(subreddit:str, id_post:str):
    """
    Permet d'extraire tous les commentaires d'un post et de les stocker dans un fichier csv

    Params :
        - subreddit : str
        - id_post : str
    """
    # We first authenticate to the API
    headers = authenticate_API()

    # initialize dataframe and parameters for pulling data in loop
    data = pd.DataFrame()
    params = {'limit': None}
    
    url = f"https://oauth.reddit.com/{subreddit}/comments/{id_post}"

    # make request
    res = requests.get(url,
                    headers=headers,
                    params=params)
    
#     new_df = pd.DataFrame({
#         'json': res
#     })

#     get dataframe from response
    new_df = df_from_response_for_comments(res, headers, id_post)

#     append new_df to data
    data = pd.concat([data, new_df], ignore_index=True)
    data['content'] = data['content'].astype("string")

    new_df = pd.DataFrame()
    new_df = children_extraction(new_df, "t3_" + id_post, 1, headers, id_post)
    
    data = pd.concat([data, new_df], ignore_index=True)
    
#     dataset_id = 'dwh'
#     # For this sample, the table must already exist and have a defined schema
#     table_id = 'comment'
#     table_ref = client.dataset(dataset_id).table(table_id)
#     table = client.get_table(table_ref)
#     # Creating a list of tuples with the values that shall be inserted into the table
#     client.insert_rows(table, data.values.tolist())
    return data
    

## L'extraction complète

In [91]:
def extraction_complete(subreddit:str="worldnews"):
    """
        Fonction qui, quand on lui fournit le subreddit, va récupérer de façon automatique :
        - ses posts;
        - les commentaires associés à chacun des posts;
        (- la liste de ses contributeurs, ainsi que
        - leur activité récente respective) (abandonné : demande trop de requêtes API pour l'instant...)

        param:
            subreddit: str, nom du subreddit au format "r/nom_subreddit"
    
        Output:
            Enregistre les données ainsi extraites dans des fichiers csv.

        
        #TODO :
            - on récupère aussi les json ?
            - prévoir d'organiser le système de fichiers où les sauvegarder (bucket dans GCP?)
    """ 
    
    posts_extraction(subreddit)
    
    # Recup BigQuery
    date = date_extraction[:10]
    heure = date_extraction[11:]
    
    sql = f"SELECT id_post FROM `mimetic-coral-355913.dwh.post` WHERE extraction_utc = '{date}T{heure}'"
    
    # BigQuerry to Pandas DataFrame
    pandas_df = client.query(sql).to_dataframe()
    # Pandas DF to Spark DataFrame
    spark_df = spark.createDataFrame(pandas_df)
    # Spark DF to Spark RDD
    rdd = spark_df.rdd
    # flatmap pour faire qlqch
    rdd2 = rdd.map(lambda row : comments_extraction(f"r/{subreddit}", row["id_post"]))
    rdd3 = rdd2.reduce(lambda df1, df2: pd.concat([df1, df2]))
    
    dataset_id = 'dwh'
    # For this sample, the table must already exist and have a defined schema
    table_id = 'comment'
    table_ref = client.dataset(dataset_id).table(table_id)
    table = client.get_table(table_ref)
    # Creating a list of tuples with the values that shall be inserted into the table
    client.insert_rows(table, rdd3.values.tolist())
    


In [66]:
date = date_extraction[:10]
heure = date_extraction[11:]

sql = f"SELECT id_post FROM `mimetic-coral-355913.dwh.post` WHERE extraction_utc = '2022-09-12T14:58:11'"

# BigQuerry to Pandas DataFrame
pandas_df = client.query(sql).to_dataframe()

In [85]:
subreddit = "PlutoTV"
# Pandas DF to Spark DataFrame
spark_df = spark.createDataFrame(results)
# Spark DF to Spark RDD
rdd = spark_df.rdd
# flatmap pour faire qlqch
rdd2 = rdd.map(lambda row : comments_extraction(f"r/{subreddit}", row["id_post"]))
rdd3 = rdd2.reduce(lambda df1, df2: pd.concat([df1, df2]))

                                                                                

In [86]:
rdd3

Unnamed: 0,id_comment,id_post,id_author,author,content,type_content,ups,downs,score,created_utc,extraction_utc,score_jigsaw
0,qlombx,qlombx,t2_a2fwzt60,roscoethehorse,Explicit lyrics,Title,0,0,0,2021-11-03 06:38:41,2022-09-12T14:58:11,
1,qlombx,qlombx,t2_a2fwzt60,roscoethehorse,Is there a way to block songs with explicit ly...,Post,0,0,0,2021-11-03 06:38:41,2022-09-12T14:58:11,
0,s5hymi,s5hymi,t2_7f8bjbav,Itchy-Throat-4779,BOycotting PlutoTV,Title,0,0,0,2022-01-16 18:24:32,2022-09-12T14:58:11,
1,s5hymi,s5hymi,t2_7f8bjbav,Itchy-Throat-4779,I am boycotting PLUTOTV for one month for infi...,Post,0,0,0,2022-01-16 18:24:32,2022-09-12T14:58:11,
2,hsy7ovs,s5hymi,t2_9jqle015,TDSinv,Lol what? They don’t make the ads 😂,Comment,5,0,5,2022-01-16 21:09:46,2022-09-12T14:58:11,
...,...,...,...,...,...,...,...,...,...,...,...,...
3,hjs8kup,qommxu,t2_14191a,KyloRenKardashian,Lol this is true. so much nostalgia,Comment,4,0,4,2021-11-08 08:57:21,2022-09-12T14:58:11,
4,hlqvhgi,qommxu,,[deleted],True that,Comment,3,0,3,2021-11-23 08:15:21,2022-09-12T14:58:11,
5,hkennx3,qommxu,t2_aw4mblxc,HaydenMilk,I'm just mad about all the glitches. The conte...,Comment,2,0,2,2021-11-13 01:40:19,2022-09-12T14:58:11,
6,hlvqfnv,qommxu,t2_13fip9,spunjbaf,I really agree. The FAVORITES function makes f...,Comment,2,0,2,2021-11-24 09:05:05,2022-09-12T14:58:11,


## Import des données des 30 plus gros subreddits

In [10]:
top30Subreddits = ["r/announcements", "r/funny", "r/AskReddit", "r/gaming", "r/aww",
                "r/Music", "r/pics", "r/science", "r/worldnews", "r/videos",
                "r/todayilearned", "r/movies", "r/news", "r/Showerthoughts", "r/EarthPorn",
                "r/gifs", "r/IAmA", "r/food", "r/askscience", "r/Jokes",
                "r/LifeProTips", "r/explainlikeimfive", "r/Art", "r/books", "r/mildlyinteresting",
                "r/nottheonion", "r/DIY", "r/sports", "r/blog", "r/space"]

In [89]:
str(datetime.now())

'2022-09-14 09:12:28.403485'

In [92]:
date_extraction = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(date_extraction)
extraction_complete("PlutoTV")

2022-09-14 09:14:17


                                                                                

In [93]:
datetime.now().strftime('%Y-%m-%d %H:%M:%S')

'2022-09-14 09:15:20'