Setup for Google Colab

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install boto3

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

Import libraries

In [None]:
import pyspark
from pyspark.sql.functions import from_json, explode, flatten, collect_list
from pyspark.sql.types import MapType, StringType, StructType, StructField
import requests
import json
import time
import os
import boto3

Replace the API key, do not want to expose it to github

In [None]:
API_KEY = ''

NewsAPI Python client would make this part much easier. I used the requests library because of the assignment requirements.

api = NewsApiClient(api_key=API_KEY) <br>
api.get_top_headlines(language='en')

Get source ids and convert it to a python dict

In [None]:
en_sources = requests.get(f'https://newsapi.org/v2/top-headlines/sources?language=en&apiKey={API_KEY}')

In [None]:
en_sources_dict = json.loads(en_sources.text)['sources']

Need to manually define the schema because some sources' fields have only "None" values and cannot be inferred automatically

In [None]:
schema = StructType([StructField("author",StringType(), nullable = True), 
                StructField("content",StringType(), nullable = True),
                StructField("description",StringType(), nullable = True),
               StructField("publishedAt",StringType(), nullable = True),
               StructField("source",MapType(StringType(),StringType()), nullable = True),
               StructField("title",StringType(), nullable = True),
               StructField("url",StringType(), nullable = True),
               StructField("urlToImage",StringType(), nullable = True)
               ])

Run it once with the first source to create a dataframe

In [None]:
source_id = en_sources_dict[0]['id']
source_headlines = requests.get(f'https://newsapi.org/v2/top-headlines?sources={source_id}&apiKey={API_KEY}')
source_articles = json.loads(source_headlines.text)['articles']
df = spark.createDataFrame(source_articles, schema=schema)

Run the rest in a loop and add to the dataframe. Every iteration fetches the articles from a specific source and adds it to the dataframe.

In [None]:
for source in en_sources_dict[1:]:
  source_id = source['id']
  try:
    source_headlines = requests.get(f'https://newsapi.org/v2/top-headlines?sources={source_id}&apiKey={API_KEY}')
    source_articles = json.loads(source_headlines.text)['articles']
    temp_df = spark.createDataFrame(source_articles, schema=schema)
  except Exception as e:
    print(source_id, '       ', e)

  df = df.union(temp_df)

In [None]:
df.describe()

summary,author,content,description,publishedAt,title,url,urlToImage
count,1234,1366,1432,1456,1456,1456,1391
mean,,,,,,,
stddev,,,,,,,
min,,,,2018-10-05T19:53:...,"""Negev Summit"" hi...",http://business.f...,http://beta.ems.l...
max,https://www.engad...,“Saturday Night L...,﻿Jalen Reagor﻿ ca...,2022-04-03T19:52:...,‘We have survived...,https://www.wsj.c...,


Explode the nested "source" column and merge the 2 rows about the same article

In [None]:
df_flat = df.select([c for c in df.columns if c not in {'source'}] + [explode(df.source)])

In [None]:
df_flat.printSchema()

root
 |-- author: string (nullable = true)
 |-- content: string (nullable = true)
 |-- description: string (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- urlToImage: string (nullable = true)
 |-- key: string (nullable = false)
 |-- value: string (nullable = true)



In [None]:
df_flat.take(4)

[Row(author='TALI ARBEL AP Technology Writer', content='Airlines have canceled more than 3,300 U.S. flights this weekend and delayed thousands more, citing weather in Florida and other issues.\r\nFlightAware, a website that tracks flights, noted major disru… [+1756 chars]', description='', publishedAt='2022-04-03T18:43:40Z', title='Airlines cancel more than 3,300 US flights over weekend', url='https://abcnews.go.com/Business/wireStory/airlines-cancel-3300-us-flights-weekend-83847532', urlToImage='https://s.abcnews.com/images/Travel/WireAP_983cc6f35ce34239a399f1d2b0f51614_16x9_992.jpg', key='name', value='ABC News'),
 Row(author='TALI ARBEL AP Technology Writer', content='Airlines have canceled more than 3,300 U.S. flights this weekend and delayed thousands more, citing weather in Florida and other issues.\r\nFlightAware, a website that tracks flights, noted major disru… [+1756 chars]', description='', publishedAt='2022-04-03T18:43:40Z', title='Airlines cancel more than 3,300 US flights

In [None]:
df_flat = df_flat.drop('key')

In [None]:
df_flat_grouped = df_flat.groupBy([c for c in df_flat.columns if c not in {'value'}]).agg(collect_list('value').alias('source'))

In [None]:
df_flat_grouped.printSchema()

root
 |-- author: string (nullable = true)
 |-- content: string (nullable = true)
 |-- description: string (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- urlToImage: string (nullable = true)
 |-- source: array (nullable = false)
 |    |-- element: string (containsNull = false)



In [None]:
df_flat_grouped.take(4)

[Row(author='Emily Stewart, Rebecca Heilweil', content='In times of crisis, there is no good; theres only a best course of action, given the circumstances. Is crypto good in the context of Russias invasion of Ukraine? Is it bad? Neutral? Its a hard questi… [+16905 chars]', description='From donations in bitcoin and crypto to Ukraine to concerns Russia might use cryptocurrency to get around sanctions, here’s how crypto is playing a role in the Russia-Ukraine war.', publishedAt='2022-03-01T11:00:00Z', title='Bitcoin and crypto are helping both sides in the Russia-Ukraine conflict', url='https://www.vox.com/recode/22955381/russia-ukraine-bitcoin-donation-war-crypto', urlToImage='https://cdn.vox-cdn.com/thumbor/Vx6YXtcklBjQAcCxzm9fyKqk72E=/0x155:3574x2026/fit-in/1200x630/cdn.vox-cdn.com/uploads/chorus_asset/file/23278239/GettyImages_1237956184.jpg', source=['Recode', 'recode', 'Recode', 'recode', 'Recode', 'recode']),
 Row(author=None, content='It could also shred the forecasts of Fed Chai

In [None]:
df_flat_grouped = df_flat_grouped.select([c for c in df_flat_grouped.columns if c not in {'source'}] + [df_flat_grouped.source[0], df_flat_grouped.source[1]])

In [None]:
df_final = df_flat_grouped.withColumnRenamed('source[0]', 'source_name').withColumnRenamed('source[1]', 'source_id')

In [None]:
df_final.take(4)

[Row(author='Emily Stewart, Rebecca Heilweil', content='In times of crisis, there is no good; theres only a best course of action, given the circumstances. Is crypto good in the context of Russias invasion of Ukraine? Is it bad? Neutral? Its a hard questi… [+16905 chars]', description='From donations in bitcoin and crypto to Ukraine to concerns Russia might use cryptocurrency to get around sanctions, here’s how crypto is playing a role in the Russia-Ukraine war.', publishedAt='2022-03-01T11:00:00Z', title='Bitcoin and crypto are helping both sides in the Russia-Ukraine conflict', url='https://www.vox.com/recode/22955381/russia-ukraine-bitcoin-donation-war-crypto', urlToImage='https://cdn.vox-cdn.com/thumbor/Vx6YXtcklBjQAcCxzm9fyKqk72E=/0x155:3574x2026/fit-in/1200x630/cdn.vox-cdn.com/uploads/chorus_asset/file/23278239/GettyImages_1237956184.jpg', source_name='Recode', source_id='recode'),
 Row(author=None, content='It could also shred the forecasts of Fed Chair Jerome Powell and other p

The data is flattened and ready to be exported to 1 csv file

In [None]:
df_final.coalesce(1).write.option('header','true').csv('en_headlines')

Rename the csv file created under the folder "en_headlines"

In [None]:
ts = time.time()
csv_name = [filename for filename in os.listdir('./en_headlines') if filename.endswith('csv')][0]
os.rename(f'./en_headlines/{csv_name}', f'./en_headlines/{ts}_headlines.csv')

Autenticate and upload to s3 bucket
(Did not want to give my credit card info so no aws account to check if it actually works, but I'm pretty sure it does)

In [None]:
client = boto3.client(
    's3',
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    aws_session_token=SESSION_TOKEN
)

file_name = f'./en_headlines/{ts}_headlines.csv'
bucket = 's3_bucket'
object_name = f'en/{ts}_headlines.csv'

response = client.upload_file(file_name, bucket, object_name)