# InsTAP

<img src="http://store-images.s-microsoft.com/image/apps.31997.13510798887167234.6cd52261-a276-49cf-9b6b-9ef8491fb799.30e70ce4-33c5-43d6-9af1-491fe4679377" alt="Drawing" style="width: 200px;" align="center" />

Il progetto **InsTAP** ha come obiettivo quello di analizzare diversi profili Instagram e valutare la positività o negatività dei commenti presenti nei vari post.

Coded by Gianluca Di Franco e Rosario Amantia

[Github Project](https://github.com/rosarioamantia/insTAP)

<center><img src="images/tap_flow.png" width="800" height="800"</center>
   

# Data Ingestion

La sorgente è ovviamente il Social Network Instagram. Per recuperare i dati di cui avevamo bisogno, abbiamo fatto uso di un pacchetto chiamato **Instaloader**.

<center><img src="images/instaloader_ko.jpg"></center>

## Producer

Al fine di generare ed inviare i dati da Instagram abbiamo programmato un producer basato sul pacchetto instaloader citato in precedenza.

Il producer si occuperà di recuperare ed inviare le seguenti informazioni contenute all'interno dei post degli utenti in esame: 
- utente che crea il post
- Immagine 
- descrizione
- data di creazione
- coordinate
- commenti 

pip install instaloader

~~pip install instaloader~~

pip install --no-cache-dir git+https://github.com/IL2C/instaloader.git@login-fix

```
POSTS_LIMIT = int(os.getenv("POSTS_LIMIT", '10'))
COMMENTS_LIMIT = int(os.getenv("COMMENTS_LIMIT", '20'))
INSTA_USER = os.getenv("INSTA_USER", "<user_here>")
INSTA_PASS = os.getenv("INSTA_PASS", "<pass_here>")
USERS_TO_WATCH = os.getenv("USERS_TO_WATCH", "matteorenzi,giorgiameloni").split(",")
LOGSTASH_URL = "http://logstash:9700"

insta = instaloader.Instaloader()
insta.login(INSTA_USER, INSTA_PASS)
for user in USERS_TO_WATCH:
    posts = instaloader.Profile.from_username(insta.context, user).get_posts()

    for i, post in enumerate(posts):
        if i == POSTS_LIMIT:
            break
        comments = post.get_comments()
        for index, comment in enumerate(comments):
            if index == COMMENTS_LIMIT:
                break

            data = {
                'message_id': index,
                'post_id': f'{post.owner_username}_{i}',
                'user': post.owner_username,
                'comment': comment.text,
                'caption': post.caption,
                'image': post.url,
                'timestamp': str(post.date_local),
                'likes': post.likes,
                'lat': post.location.lat if post.location else None,
                'lng': post.location.lng if post.location else None
            }
            print(str(data))
            sleep(1)
            x = requests.post(LOGSTASH_URL, json=data, timeout=5)
            
```

# Logstash

Logstash è uno strumento in grado di raccogliere, modificare e inoltrare dati provenienti da sorgenti diverse verso una o più destinazioni.
Il file di configurazione di logstash è formato da tre sezioni:
- **input**: ricevere/recuperare le informazioni;
- **filter**: rimuovere/aggiungere/modificare campi;
- **output**: inviare i dati alla destinazione. 

<img src="images/logstash.jpg" width="1200" height="1200">

Nel nostro caso, Logstash recupera i dati inviati dalla sorgente alla porta 9700 e li invia al broker di messaggi Kafka.

### il nostro .conf
```
input {
 http {
    port => "9700"
    host => "0.0.0.0"
  }
}

filter {
}

output {
  kafka {
    codec => json
    topic_id => "instap"
    bootstrap_servers => "http://broker:9092"
  }
}
```

<center><img src="images/logstash-explained.jpg" width="350" height="350">
    </center>

# Apache Kafka

![kafka.png](images/kafka.png)

Apache Kafka è una piattaforma distribuita basata sui messaggi e sul modello publish-subscribe con lo scopo di gestire streaming di dati.

Kafka è ideale per:
- Scambi di dati affidabili tra componenti diversi
- Gestione dei carichi di lavoro
- Streaming in tempo reale per l'elaborazione dei dati
- Supporto per la riproduzione di dati/messaggi

Concetti chiave di Kafka:
- **Producer**: colui che invia i dati. Il producer definisce anche su quale topic dovrà arrivare il messaggio.
- **Topic**: argomento del messaggio. 
    - i producers scelgono su quale topic devono inviare i messaggi;
    - i consumers scelgono su quale topic leggere i messaggi. 
- **Partizione**: struttura dati all'interno del topic in cui vengono scritti i messaggi. Ogni topic ha 1 o più partizioni.
- **Consumer**: colui che legge i messaggi. Ogni consumer appartiene ad un gruppo.


<center><img src="images/consumer-group-kafka.png" width="1200" height="1200"></center>

All'interno del nostro progetto, Kafka è così suddiviso:
- **Zookeper**: servizio centralizzatto per il mantenimento di configurazioni e denominazioni; 
- **Broker**: Cuore di Kafka, gestisce il flusso di dati proveniente da ElasticSearch;
- **Init-Kafka**: Script per l'inizializzazione di topic, partition..
- **Kafka-Ui**: interfaccia Web relativa a Kafka.

<center><img src="images/spark-logo.png" width="330px" height="126px"> &nbsp;</center>

<p>Apache Spark è un framework open source per il calcolo distribuito che esegue le sue elaborazioni in memoria per migliorare le prestazioni delle applicazioni che analizzano Big Data.</p>

Si tratta di uno strumento estremamente veloce:
- In memoria
- Lazy
- Semplicità di utilizzo


<br>
<br>
<center><img src="https://spark.apache.org/images/spark-stack.png"></center>

<p>
Per l'allenamento del modello è stato utilizzato PySpark ed in particolare la libreria Spark MLlib utilizzando la funzione "LogisticRegression" all'interno di una pipeline di machine learning.
</p>

```
#tokenize the Instagram comments text    
stage_1 = RegexTokenizer(inputCol= 'comment' , outputCol= 'tokens', pattern= '\\W')

#remove the stop words
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')

#create a word vector of size 50
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 50)

#Logistic Regression Model
model = LogisticRegression(featuresCol= 'vector', labelCol= 'positive')

# setup the pipeline
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

dataset = spark.read.csv('../tap/spark/dataset/social_training_set.csv',
                         schema=training_set_schema,
                         header=True,
                         sep=',')

#trained model
pipelineFit = pipeline.fit(dataset)

```

quando avviene la chiamata pipeline.fit(), viene adattato il modello al training set di input e gli stage vengono eseguiti in ordine per ogni record del dataset

<center><img src="images/before_training.jpg" alighn="center"> </center>

<center><img src="images/after_training_1.jpg"></center>


Una volta ottenuto il modello allenato, il passo successivo è quello di leggere i dati inseriti in tempo reale da Kafka nel topic "instap_topic".
<br>
<br>
Per leggere il flusso di dati viene usata la libreria Spark Structured Streaming, una libreria per processare i dati in streaming all'interno della libreria Spark SQL
```

#Create DataFrame representing the stream of input lines from Kafka
df = spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', kafkaServer) \
        .option('subscribe', "instap_topic"). \
            option("startingOffsets","earliest").load()
```
<center><img src="images/kafka_spark.png"></center>

<p>
   A questo punto, tramite un Output Structured Streaming, si elaborano i dati in stream applicando il modello sui dati per ogni micro-batch ed effettuando una predizione sul commento che verrà aggiunta in una colonna "prediction" nel DataFrame risultante<br>
</p>

```
df.writeStream \
    .foreachBatch(process_batch) \
    .start() \
    .awaitTermination()
```
<center><img src="images/spark_ml.png"></center>

Una volta processati e analizzati, i dati vengono inviati da PySpark verso Elasticsearch all'interno dell'index "instap_index" tramite il client Python

```
es = Elasticsearch(hosts=elastic_host, verify_certs = False)
resp = es.index(index = "instap_index", id = id, document = alayzed_data)

```
<br>
<img src="images/spark_es.png" align="center">


<center><img src="images/elasticsearch.png" align="center" width="400px" height="400px"></center>
<p>
    Elasticsearch è un motore per la ricerca e l'analisi di dati: è in grado di gestire tutte le tipologie di dato (testuale, numerico, geospaziale, strutturato e non strutturato) ed è conosciuto per la sua natura distribuita, velocità e scalabilità.
<br>
<br>
Elasticsearch è stato utilizzato per indicizzare e memorizzare i dati precedentemente elaborati e inviarli a Kibana per visualizzarli.
</p>
<b>Nota: </b>una volta memorizzati, i dati  possono essere recuperati in modo veloce ed efficiente attraverso semplici chiamate REST o le apposite API

<p><b>Le conferme che rendono felici<b></p>
<img src="images/es_log.png">

<img src="images/kibana.png" width="430px" height="226px">

Kibana è una UI utilizzata per visualizzare ed analizzare i dati di Elasticsearch con l'ausilio di grafici.
<br>
<br>
Di seguito vengono riportati alcuni grafici mostrati nella dashboard. <br>

<center><img src="images/Homer-kibana.jpg"></center>

![kibana-dashboard.jpg](images/all-dashboard.jpg)

![kibana-post-prediction.jpg](images/kibana-post-prediction.jpg)

![kibana-donut.jpg](images/kibana-donut.jpg)