# Background
Il progetto consiste in una pipeline il cui fine ultimo (per ora) è quello di classificare i meme rispetto all'argomento trattato.  
Si è scelto di classificare i meme in quattro classi (Sports, Intrattenimento, Politica, Altro).  

Task simili sono di vitale interesse per i social media, per esempio recentemente [Instagram ha introdotto una policy](https://about.instagram.com/it-it/blog/announcements/continuing-our-approach-to-political-content-on-instagram-and-threads) secondo cui i post che contengono riferimenti a politica, pubblicati dagli account che l'utente non segue, non verranno visualizzati di default, a meno che l'utente stesso non cambi un flag nelle impostazioni (opt-in).  


![instagram_policy](slides/imgs/instagram_policy.webp)  

### Dataset
Il dataset è costituito da meme provenienti da reddit. Ho scelto questo social media in quanto ogni post su reddit appartiene ad un sub-reddit che definisce una specifica area di interesse. Per esempio, tutti i meme del subreddit [r/PoliticalMemes](https://www.reddit.com/r/PoliticalMemes/) parlano certamente di politica. In altre parole, ogni post ha una etichetta implicita data dal subreddit di appartenenza.
Come anticipato, ho scelto di categorizzare i meme in quattro categorie (Sports, Intrattenimento, Politica, Altro), il seguente codice dichiara un dizionario per mappare i nomi dei rispettivi subreddit ai nomi delle classi che ho scelto.

### Labels
```python
categories_id = {
    "sport": 0,
    "ent": 1,
    "pol": 2,
    "oth": 3
}
# categories_names = {0: "sport", 1: "ent", 2: "pol", 3: "oth"} ovvero categories_id invertito
categories_names = {v: k for k, v in categories_id.items()}
```


### Per ogni subreddit la sua label (e viceversa)
```python
subreddit_categories = {
    "Animemes": categories_id["ent"],
    "GymMemes": categories_id["sport"],
    "RelationshipMemes": categories_id["oth"],
    "PoliticalCompassMemes": categories_id["pol"],
    "PhilosophyMemes": categories_id["oth"],
    "CollegeMemes": categories_id["oth"],
    "HistoryMemes": categories_id["oth"],
    "TheRightCantMeme": categories_id["pol"],
    "AnimalMemes": categories_id["oth"],
    "moviememes": categories_id["ent"],
    "tvmemes": categories_id["ent"],
    "musicmemes": categories_id["ent"],
    "gamingmemes": categories_id["ent"],
    "PoliticalHumour": categories_id["pol"],
    "PoliticalMemes": categories_id["pol"],
    "ScienceHumour": categories_id["oth"],
    "soccermemes": categories_id["sport"],
    "footballmemes": categories_id["sport"],
    "Nbamemes": categories_id["sport"],
}
```

### Python + Reddit = PRAW <3
Verrà utilizzata la libreria PRAW (The Python Reddit API Wrapper) per interfacciarsi con le API di Reddit
![python](./slides/imgs/python.webp) ![praw](./slides/imgs/praw.png) ![reddit](./slides/imgs/reddit.png)

# Panoramica del progetto

<img src="slides/imgs/project_overview_v2.png" alt="project_structure" style="max-width: 100%; max-height: 90vh; height: auto; display: block; margin: 0 auto;">



### First thing first: data ingestion
L'interfacciamento con reddit sarà svolto da uno script in python che fa uso della libreria PRAW (Python Reddit API Wrapper) il cui compito sarà quello di fetchare i post in streaming dai subreddits prescelti e inoltrarli a logstash mediante interfaccia HTTP.
```python
def get_posts(reddit, subreddits, t):
    for submission in reddit.subreddit(subreddits).stream.submissions():
        yield submission
```

### data ingestion
Ho scelto logstash per la semplicità d'uso e la configurazione semplice e diretta.

```conf
input {
  http {
    id => "tap_http_in"
  }
}

output {
  kafka {
    codec => json
    topic_id => "reddit_posts"
    bootstrap_servers => "kafkaserver:9092"
  }
  # stdout { codec => rubydebug }
}
```



![logstash_clown](./slides/imgs/logstash_clown.jpg)  


### how much logstash http input plugin is cursed
```conf
input {
  http {
    id => "tap_http_in"
    port => 8081
    # https://github.com/logstash-plugins/logstash-input-http/issues/155
    additional_codecs => {}
    codec => json { target => "[document]" }
  }
}
```

### filter perchè less is more
```conf
filter {
  # Mantieni solo il campo document
  mutate {
    remove_field => ["url", "@version", "@timestamp", "user_agent", "event", "http", "host"]
  }
}
```

### finally output to kafka
```conf
# https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html#plugins-outputs-kafka-message_key
output {
  kafka {
    codec => json
    topic_id => "reddit-posts"
    bootstrap_servers => "kafkaserver:9092"
    message_key => "%{[document][id]}"
  }
  # stdout { codec => rubydebug }
}
```

![logstash clown 2](./slides/imgs/logstash_clown_2.jpg)

### What about images?

### Data streaming: kafka
```yaml
    zookeeper:
        image: tap:kafka
        container_name: kafkaZK
        environment:
            - KAFKA_ACTION=start-zk
        networks:
            tap:
                ipv4_address: 10.0.100.22
        profiles: [ "kafka", "ingestion","logstash", "pipeline"]
    kafkaServer:
        image: tap:kafka
        container_name: kafkaServer
        environment:
            - KAFKA_ACTION=start-kafka
        networks:
            tap:
                ipv4_address: 10.0.100.23
        profiles: [ "kafka", "ingestion","logstash", "pipeline"]
        depends_on:
            - zookeeper

    topic:
        image: tap:kafka
        container_name: kafkaTopic1
        environment:
            - KAFKA_ACTION=create-topic
            - KAFKA_PARTITION=2
            - KAFKA_TOPIC=reddit-posts
        networks:
            tap:
        depends_on:
            - zookeeper
            - kafkaServer
        profiles: [ "kafka", "ingestion","logstash", "pipeline"]
```

![kafka_meme](./slides/imgs/kafka_1.jpeg)

### Data processing: Spark
Spark si occupa della classificazione dei meme, ho suddiviso il codice del training del modello dal codice che processa i dati in streaming, vediamone alcuni pezzi.

### Data processing: Spark - modello di classificazione
```python
df = df.withColumn(
    "all_text", concat_ws(" ", df.title, df.selftext, df.ocr_text, df.caption_text)
)
tokenizer = Tokenizer(inputCol="all_text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2**16)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(maxIter=10, tol=1e-6, fitIntercept=True)
ovr = OneVsRest(classifier=lr)
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, ovr])
```


### Data processing: Spark - streaming
Il codice legge lo stream proveniente da kafka e per ogni batch classifica i meme contenuti in esso, poi scrive il risultato su elasticsearch.
```python
print("Reading stream from kafka...")
# Read the stream from kafka
kafka_df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", kafkaServer)
    .option("subscribe", topic)
    .load()
)
# omissis
df.writeStream.foreachBatch(process).start().awaitTermination()
```

### Data processing: Spark - streaming
```python
def process(batch_df: DataFrame, batch_id: int):
    print("Processing batch: ", batch_id)
    df = batch_df
    # omissis
    df = df.withColumn(
        "all_text", concat_ws(" ", df.title, df.selftext, df.ocr_text, df.caption_text)
    )

    df1 = classificationModel.transform(df)
    # omissis
    print("Processed batch: ", batch_id, "writing to elastic")
    df1.write.format("es").mode("append").option("es.mapping.id", "id").save(
        elastic_index
    )
```

### Data indexing: elasticsearch - mappings are important!
Ho creato un ulteriore container che si occupa di creare un indice su elasticsearch e di impostare i mappings per i dati che gli manderò. Quest'ultimo step è stato davvero importante perchè altrimenti su kibana risulta impossibile effettuare alcune visualizzazioni che ho fatto, che richiedevano la tokenizzazione di alcuni campi di testo.
```python
mapping = {
    "settings": {
        "analysis": {
            "analyzer": {
                "custom_analyzer": {
                    "type": "standard",
                    "stopwords": "_english_",  # Rimuove le stopwords in inglese
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "id": {"type": "keyword"},
            "created_timestamp": {
                "type": "date",
                "format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ||epoch_millis",
            },
            "title": {
                "type": "text",
                "fielddata": True,
                "fields": {
                    "keyword": {
                        "type": "keyword",
                        "ignore_above": 256,  # Limita la lunghezza massima del campo indicizzato
                    }
                },
                "analyzer": "standard",
            },
            "selftext": {
                "type": "text",
                "fielddata": True,
                "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
                "analyzer": "custom_analyzer",
            },
            "caption_text": {
                "type": "text",
                "fielddata": True,
                "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
                "analyzer": "custom_analyzer",
            },
            "ocr_text": {
                "type": "text",
                "fielddata": True,
                "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
                "analyzer": "custom_analyzer",
            },
            "score": {"type": "integer"},
            "upvote_ratio": {"type": "float"},
            "subreddit": {"type": "keyword"},
            "img_url": {"type": "keyword"},
            "img_filename": {"type": "keyword"},
            "num_comments": {"type": "integer"},
            "predicted_category": {"type": "keyword"},
            "ground_truth_category": {"type": "keyword"},
            "all_text": {
                "type": "text",
                "fielddata": True,
                "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
                "analyzer": "custom_analyzer",
            },
        }
    },
}
```

### Data visualization: kibana
but first let me take a demo