In [1]:
%pip install pyspark==3.2.0, requests

Note: you may need to restart the kernel to use updated packages.


In [4]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import *
from requests import get

In [5]:
postgres_jar = "/usr/local/spark/jars/postgresql-42.3.8.jar"

In [6]:
spark = (
    SparkSession
    .builder
    .appName("An Spark application")
    .config("spark.jars", postgres_jar)
    .getOrCreate()
)

25/01/23 00:05:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/23 00:05:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [7]:
jdbc_url = "jdbc:postgresql://postgres-db:5432/downstream"
db_properties = {
    "user": "admin",
    "password": "secret",
    "driver": "org.postgresql.Driver"
}

In [8]:
schema = StructType([
    StructField("Game", StringType(), nullable=True),
    StructField("Year", IntegerType(), nullable=True),
    StructField("Genre", StringType(), nullable=True),
    StructField("Publisher", StringType(), nullable=True),
    StructField("North America", DoubleType(), nullable=True),
    StructField("Europe", DoubleType(), nullable=True),
    StructField("Japan", DoubleType(), nullable=True),
    StructField("Rest of world", DoubleType(), nullable=True),
    StructField("Global", DoubleType(), nullable=True)
])

## Creando la tabla de videojuegos en PSQL

In [9]:
df = spark.read.option("header", True).csv("/home/jovyan/data/PS4_GamesSales.csv")

In [10]:
df.write.jdbc(url=jdbc_url, table="Games", mode="overwrite", properties=db_properties)

## ETL con Spark
Hagamos una ETL usando Spark.
1. Extraemos de MongoDB, nuestro upstream.
2. Realicemos las siguientes transformaciones sobre los datos:
3. Cargamos a PostgreSQL, nuestro downstream.

In [78]:
from dataclasses import dataclass, field
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf

In [57]:
@dataclass
class RequestConf:
    __date_from: str
    __date_to: str
    __search_term: str
    __api_key: str = "4f1e6cf579d44b7db3fe6d6505c54b2b"
    url: str = field(init=False)

    def __post_init__(self):
        self.url = f"https://newsapi.org/v2/everything?q={self.__search_term}&from={self.__date_from}&to={self.__date_to}&apiKey={self.__api_key}"

In [32]:
@dataclass
class LoadConf:
    url: str
    table: str
    mode: str
    properties: dict

    def __post_init__(self):
        assert self.mode.lower() in ["overwrite", "append"], f"Mode {self.mode} not supported yet."

In [98]:
def get_news(req_conf: RequestConf) -> dict:
    res:dict =  get(req_conf.url).json()
    try:
        return res["articles"]
    except KeyError:
        raise Exception("Not found key 'articles' in response from news API.")

def register_udf(req_conf: RequestConf):
    res_schema = ArrayType(StructType([
        StructField("title", StringType()),
        StructField("description", StringType()),
        StructField("url", StringType()),
        StructField("publishedAt", TimestampType()),
        StructField("content", StringType())
    ]))
    return udf(lambda: get_news(req_conf), res_schema)
    

In [112]:
class Pipeline:
    class Extract:
        def __init__(self, spark: SparkSession, req_conf: RequestConf):
            assert isinstance(req_conf, RequestConf), "Passed configurations must be instance of RequestConf"
            self.spark = spark
            self.req_conf = req_conf

        
        def run(self) -> DataFrame:
            #udf_get_news = register_udf(self.req_conf)
            row = Row("empty")
            input_df = self.spark.createDataFrame([row("GET")])
            
    
    class Load:
        def __init__(self, load_conf: LoadConf):
            assert isinstance(load_conf, LoadConf), "Passed configurations must be instance of LoadConf"
            self.load_conf = load_conf
        
        def run(self, df: DataFrame) -> None:
            assert isinstance(df, DataFrame)
            df.write.jdbc(
                url=self.load_conf.url, 
                table=self.load_conf.table, 
                mode=self.load_conf.mode, 
                properties=self.load_conf.properties
            )
            
    def __init__(self, req_conf: RequestConf, load_conf: LoadConf):
        spark = (
            SparkSession
            .builder
            .appName("An Spark application")
            .config("spark.jars", postgres_jar)
            .getOrCreate()
        )
        self.extract = Pipeline.Extract(spark, req_conf)
        self.load = Pipeline.Load(load_conf)
    
    def run(self) -> None:
        df: DataFrame = self.extract.run()
        return df
        #self.load.run(df)

In [106]:
def main(*args):
    db_properties = {
        "user": "admin",
        "password": "secret",
        "driver": "org.postgresql.Driver"
    }
    for search_term in args:
        req_conf = RequestConf("2025-01-19", "2025-01-20", search_term)
        load_conf = LoadConf("jdbc:postgresql://postgres-db:5432/downstream", "news", "overwrite", db_properties)
        pipeline = Pipeline(req_conf, load_conf)
        pipeline.run()
    

In [113]:
main("trump")

Traceback (most recent call last):
  File "/opt/conda/lib/python3.12/site-packages/pyspark/serializers.py", line 437, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.12/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/opt/conda/lib/python3.12/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.12/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 653, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.12/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 526, in _function_reduce
    return self._dynamic_function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.12/site-packages/pyspark/clou

PicklingError: Could not serialize object: IndexError: tuple index out of range

In [26]:
def get_news(url: str, search_term: str, date_from: str, api_key: str):
    return get(url.format(search_term, date_from, api_key)).json()

In [34]:
news_url = "https://newsapi.org/v2/everything?q={}&from={}&sortBy=publishedAt&apiKey={}"
get_news(news_url, "elon musk", "2024-12-20", API_KEY)["articles"][0:3]

[{'source': {'id': None, 'name': 'Newsbreak.com'},
  'author': 'Brett Rowland | The Center Square',
  'title': "Report: Federal government can't fully account for its 'unsustainable' spending - NewsBreak",
  'description': "(The Center Square) – A Congressional watchdog says it is again unable to determine if the federal government's financial statements are reliable. The fed",
  'url': 'https://www.newsbreak.com/share/3767247761179-report-federal-government-can-t-fully-account-for-its-unsustainable-spending?_f=app_share&pd=0GOkyYtn&lang=en_US&send_time=1737245806&trans_data=%7B%22platform%22%3A0%2C%22cv%22%3A%2225.3.0.29%22%2C%22languages%22%3A%22en%22%7D&sep=ns_foryou_blend_exp_25q1-v2%2Cns_foryou_model_exp_25q1-v2%2Cns_foryou_rank_exp_25q1-v3%2Cns_foryou_recall_exp_25q1-v5%2Cns_push_exp_rt_bucketv12-v1&s=i2',
  'urlToImage': 'https://img.particlenews.com/img/id/3CHFvx_0yFyNcEn00',
  'publishedAt': '2025-01-19T00:24:36Z',
  'content': "(The Center Square) A Congressional watchdog say