# AQUISIÇÃO E PREPARAÇÃO DE DADOS DE REDES SOCIAIS VIA STREAMING UTILIZANDO APACHE SPARK

<div style="text-align: right">Kleber Porto dos Santos</div>

_____


Notebook para análise dos dados coletados através do Spark.


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("TwitterDataEDA") \
    .getOrCreate()

Carrega o local onde estão armazenados os arquivos criados para o Spark

In [2]:
df = spark.read.csv('../../data/raw',header=False,inferSchema=False)

Mostra uma prévia do DataFrame carregado.

truncate = True para que as linhas não fiquem muito compridas e ilegíveis

In [3]:
df.show(truncate=True)

+-------------------+---------------+--------------------+--------------------+---+
|                _c0|            _c1|                 _c2|                 _c3|_c4|
+-------------------+---------------+--------------------+--------------------+---+
|1411686882283511816|         lugwno|Sun Jul 04 14:02:...|RT @mckatiaafiel:...|140|
|1411686899559940105|     Propalando|Sun Jul 04 14:02:...|@nildaalcarinque ...| 72|
|1411686906165968900|        biagfig|Sun Jul 04 14:02:...|RT @DiogoGTB: Pes...|139|
|1411686921147912195|  valmirconsoni|Sun Jul 04 14:02:...|RT @caiocopolla: ...|140|
|1411686933387001857|  franciscaa_99|Sun Jul 04 14:02:...|RT @Kmbitches: Ta...|140|
|1411686945177079814|  Djasantastico|Sun Jul 04 14:02:...|RT @_makavelijone...|140|
|1411686954807222278|  AurineteTiago|Sun Jul 04 14:02:...|RT @senadorhumber...|139|
|1411686885186052100|     AngeloFF04|Sun Jul 04 14:02:...|RT @Inesszinhaa: ...| 70|
|1411686900939755523|        dehta_s|Sun Jul 04 14:02:...|RT @cinefilo_K: g.

Renomeando as colunas e modificando os tipos

In [4]:
from pyspark.sql.types import IntegerType

In [5]:
df.columns

['_c0', '_c1', '_c2', '_c3', '_c4']

In [8]:
df = df.withColumnRenamed('_c0','User_ID') \
        .withColumnRenamed('_c1','User_Name')   \
        .withColumnRenamed('_c2', 'Date') \
        .withColumnRenamed('_c3', 'Tweet') \
        .withColumnRenamed('_c4', 'Length')

In [9]:
df = df.withColumn("Length", df["Length"].cast(IntegerType()))

In [10]:
df.show()

+-------------------+---------------+--------------------+--------------------+------+
|            User_ID|      User_Name|                Date|               Tweet|Length|
+-------------------+---------------+--------------------+--------------------+------+
|1411686882283511816|         lugwno|Sun Jul 04 14:02:...|RT @mckatiaafiel:...|   140|
|1411686899559940105|     Propalando|Sun Jul 04 14:02:...|@nildaalcarinque ...|    72|
|1411686906165968900|        biagfig|Sun Jul 04 14:02:...|RT @DiogoGTB: Pes...|   139|
|1411686921147912195|  valmirconsoni|Sun Jul 04 14:02:...|RT @caiocopolla: ...|   140|
|1411686933387001857|  franciscaa_99|Sun Jul 04 14:02:...|RT @Kmbitches: Ta...|   140|
|1411686945177079814|  Djasantastico|Sun Jul 04 14:02:...|RT @_makavelijone...|   140|
|1411686954807222278|  AurineteTiago|Sun Jul 04 14:02:...|RT @senadorhumber...|   139|
|1411686885186052100|     AngeloFF04|Sun Jul 04 14:02:...|RT @Inesszinhaa: ...|    70|
|1411686900939755523|        dehta_s|Sun Ju

In [11]:
df.printSchema()

root
 |-- User_ID: string (nullable = true)
 |-- User_Name: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Tweet: string (nullable = true)
 |-- Length: integer (nullable = true)



In [12]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer

In [13]:
tokenizer = Tokenizer(inputCol="Tweet", outputCol="token_text")

stop_remover = StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')
stop_remover.loadDefaultStopWords(language='portuguese')

count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')

idf = IDF(inputCol="c_vec", outputCol="tf_idf")

In [14]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [15]:
clean_up = VectorAssembler(inputCols=['tf_idf','Length'],outputCol='features')

In [16]:
from pyspark.ml import Pipeline

In [17]:
data_prep_pipe = Pipeline(stages=[tokenizer,stop_remover,count_vec,idf,clean_up])

In [18]:
cleaner = data_prep_pipe.fit(df)

In [19]:
clean_data = cleaner.transform(df)

In [21]:
clean_data.show(truncate=True)

+-------------------+---------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            User_ID|      User_Name|                Date|               Tweet|Length|          token_text|         stop_tokens|               c_vec|              tf_idf|            features|
+-------------------+---------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|1411686882283511816|         lugwno|Sun Jul 04 14:02:...|RT @mckatiaafiel:...|   140|[rt, @mckatiaafie...|[rt, @mckatiaafie...|(3242,[0,1,4,6,7,...|(3242,[0,1,4,6,7,...|(3243,[0,1,4,6,7,...|
|1411686899559940105|     Propalando|Sun Jul 04 14:02:...|@nildaalcarinque ...|    72|[@nildaalcarinque...|[@nildaalcarinque...|(3242,[0,2,3,6,8,...|(3242,[0,2,3,6,8,...|(3243,[0,2,3,6,8,...|
|1411686906165968900|        biagfig|Sun