In [84]:
#IMPORTS AND MISC
import os, sys, requests, re
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import StringType
import pyspark.sql.functions as f
from bs4 import BeautifulSoup

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
#END IMPORTS AND MISC

#Global variables

#Github data folder
data_url = "https://github.com/umbrae/reddit-top-2.5-million/tree/master/data"

# Create a SparkSession
spark = SparkSession.builder.appName("PySpark data aquisition").getOrCreate()

#Fetching csv files from github, saving and loading them using SparkFile

#As example, for each subreddit take first post and save it in another dataframe

#HTTP request to github repo
result = requests.get(data_url)

#Using BeautifulSoup to parse the response into urls for csv files
soup = BeautifulSoup(result.text, 'html.parser')
csvfiles = soup.find_all(title=re.compile("\.csv$"))

#For each file, extract the name and save it in a list
filenames = [ ]
for i in csvfiles:
        filenames.append(i.extract().get_text())

#(UGLY) Convert list to dict and back to list to remove duplicates
filenames = list(dict.fromkeys(filenames))
print(filenames)

#Infer schema from first file, without it we cannot create an empty dataframe with specified schema
url = "https://raw.githubusercontent.com/umbrae/reddit-top-2.5-million/refs/heads/master/data/" + filenames[0]
spark.sparkContext.addFile(url)
df = spark.read.csv("file:///"+SparkFiles.get(filenames[0]), header=True, inferSchema=True, multiLine=True)

#Iterate through the files and save in a dataframe only the first entry from each
final_df = spark.createDataFrame([], schema=df.schema)
final_df = final_df.drop('over_18')
final_df.printSchema()
for file in filenames:
    url = "https://raw.githubusercontent.com/umbrae/reddit-top-2.5-million/refs/heads/master/data/" + file
    spark.sparkContext.addFile(url)
    df = spark.read.csv("file:///"+SparkFiles.get(file), header=True, inferSchema=True, multiLine=True)
    temp = spark.sparkContext.parallelize([df.head()]).toDF(schema=df.schema)
    #input sanitization
    for c in temp.columns:
        temp = temp.withColumn(c, col(c).cast(StringType()))
    temp = temp.drop('over_18')
    final_df = final_df.union(temp)
    #print(df.head())

final_df.show()

['0x10c.csv', '2007scape.csv', '24hoursupport.csv', '30ROCK.csv', '3DS.csv', '3Dprinting.csv', '3FrameMovies.csv', '3amjokes.csv', '45thworldproblems.csv', '49ers.csv', '4chan.csv', '4x4.csv', '52weeksofcooking.csv', '90daysgoal.csv', '90sHipHop.csv', '90scartoons.csv', '911truth.csv', 'ABraThatFits.csv', 'ADHD.csv', 'AFL.csv', 'AFOL.csv', 'AMA.csv', 'AbandonedPorn.csv', 'AcademicPhilosophy.csv', 'Accounting.csv', 'AdPorn.csv', 'AdrenalinePorn.csv', 'AdvancedFitness.csv', 'AdvancedRunning.csv', 'Advice.csv', 'AdviceAnimals.csv', 'AdviceAtheists.csv', 'AerialPorn.csv', 'AgriculturePorn.csv', 'AirForce.csv', 'AlbumArtPorn.csv', 'AlienBlue.csv', 'AlisonBrie.csv', 'AllThingsTerran.csv', 'Alot.csv', 'Alternativerock.csv', 'AmIFreeToGo.csv', 'AmateurRoomPorn.csv', 'AmericanHorrorStory.csv', 'AmericanPolitics.csv', 'Ameristralia.csv', 'Anarchism.csv', 'AnarchistNews.csv', 'Anarcho_Capitalism.csv', 'Android.csv', 'AndroidGaming.csv', 'AndroidQuestions.csv', 'AngieVaronaLegal.csv', 'AnimalCross