In [2]:
import argparse
from os.path import join
import boto3

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t

In [3]:
    spark = SparkSession\
        .builder\
        .appName("twitter_transformation")\
        .getOrCreate()

In [4]:
s3 = boto3.resource('s3')

for bucket in s3.buckets.all():
    print(bucket.name)

aws-logs-575556700570-us-east-1
xpto-refined
xpto-scripts
xptoraw


In [6]:
tweets = t.StructType([
    t.StructField("id", t.LongType(), nullable=False),
    t.StructField("text", t.StringType(), nullable=False),
    t.StructField("created_at", t.TimestampType(), nullable=False)
])

df = spark.read.option("quotes", "'").schema(tweets).csv('/home/rafael_ignaulin/Desktop/COMPASSO/Sprint_5/week_09/twitter_raw.csv')

In [7]:
df.show(10, False)
df.printSchema()

+-------------------+-----------------------------------------------------+-------------------+
|id                 |text                                                 |created_at         |
+-------------------+-----------------------------------------------------+-------------------+
|1029910648661170000|@Luis_mergulhado To confiante disso :)               |2018-08-16 02:00:08|
|1029910699223460000|Ele tá me fazendo um bem danado haa :)               |2018-08-16 02:00:20|
|1029910699710010000|@alanzoka Nem assisti, mas de nada :)                |2018-08-16 02:00:20|
|1029910704550280000|Vitinho é isso aí? Dez. Milhões. De. Euros. :)       |2018-08-16 02:00:21|
|1029910744186450000|Hora de desligar o pc e ir dormir :/ , Boa Noitee! :D|2018-08-16 02:00:30|
|1029910797252740000|E finalmente de volta à realidade :)                 |2018-08-16 02:00:43|
|1029910807591690000|@nenedasazon oii :) acho q vamos nos ver sábado      |2018-08-16 02:00:46|
|1029910828898750000|@BeatrizDinis9 Eheh

In [8]:
df.show(1000)

+-------------------+--------------------+-------------------+
|                 id|                text|         created_at|
+-------------------+--------------------+-------------------+
|1029910648661170000|@Luis_mergulhado ...|2018-08-16 02:00:08|
|1029910699223460000|Ele tá me fazendo...|2018-08-16 02:00:20|
|1029910699710010000|@alanzoka Nem ass...|2018-08-16 02:00:20|
|1029910704550280000|Vitinho é isso aí...|2018-08-16 02:00:21|
|1029910744186450000|Hora de desligar ...|2018-08-16 02:00:30|
|1029910797252740000|E finalmente de v...|2018-08-16 02:00:43|
|1029910807591690000|@nenedasazon oii ...|2018-08-16 02:00:46|
|1029910828898750000|@BeatrizDinis9 Eh...|2018-08-16 02:00:51|
|1029910838839260000|@londeegabi Vc só...|2018-08-16 02:00:53|
|1029910841230010000|   ai hj to feliz :D|2018-08-16 02:00:54|
|1029910850860140000|Você gostaria de ...|2018-08-16 02:00:56|
|1029910869675780000|isso ai, fica bri...|2018-08-16 02:01:00|
|1029910886918630000|como foi seu dia ...|2018-08-16 02

In [9]:
df.groupBy(f.to_date("created_at")).count().show(100, False)

+-------------------+-----+
|to_date(created_at)|count|
+-------------------+-----+
|2018-09-01         |374  |
|2018-09-09         |4331 |
|2018-10-05         |465  |
|2018-09-08         |2003 |
|2018-08-31         |548  |
|2018-09-18         |15432|
|2018-09-24         |23080|
|2018-09-19         |25089|
|2018-09-30         |8553 |
|2018-09-11         |22943|
|2018-10-01         |10414|
|2018-08-28         |1250 |
|null               |153  |
|2018-09-16         |7615 |
|2018-08-29         |243  |
|2018-09-15         |22946|
|2018-08-24         |726  |
|2018-08-16         |10693|
|2018-10-02         |11258|
|2018-08-17         |12552|
|2018-09-06         |2975 |
|2018-09-13         |20068|
|2018-10-08         |18052|
|2018-09-03         |8614 |
|2018-09-07         |733  |
|2018-08-19         |11019|
|2018-08-25         |944  |
|2018-09-05         |7271 |
|2018-09-22         |13433|
|2018-08-26         |1565 |
|2018-09-26         |24224|
|2018-08-21         |3234 |
|2018-09-12         

In [10]:
df2 = df.withColumn("created_date", f.to_date("created_at")).repartition("created_date")
    # .withColumn("sentimento")
    # .withColumn("simbolo")
df2.show(10)
# df2.filter(df2.text.contains(':)')).collect()

+-------------------+--------------------+-------------------+------------+
|                 id|                text|         created_at|created_date|
+-------------------+--------------------+-------------------+------------+
|1035885099630450000|só pq podia acord...|2018-09-01 13:40:28|  2018-09-01|
|1035885109856160000|@beaaacruzz O que...|2018-09-01 13:40:30|  2018-09-01|
|1035885117149980000|@salvinerocha Fal...|2018-09-01 13:40:32|  2018-09-01|
|1035885146669540000|@SrRaposildo2210 ...|2018-09-01 13:40:39|  2018-09-01|
|1035885207658880000|Só vamo :D https:...|2018-09-01 13:40:53|  2018-09-01|
|1035885212180400000|@pedrocsemedo Opá...|2018-09-01 13:40:55|  2018-09-01|
|1035885244967210000|@marcelo__biomed ...|2018-09-01 13:41:02|  2018-09-01|
|1035885269105430000|Vou começar a ler...|2018-09-01 13:41:08|  2018-09-01|
|1035885316857640000|@andersonpinero S...|2018-09-01 13:41:20|  2018-09-01|
|1035885335308390000|nem acreito que c...|2018-09-01 13:41:24|  2018-09-01|
+-----------

In [31]:
# df.select(df.created_at.isNull()).show(100, False)
df.na.fill(f.Row(df.created_at + 1), ["created_at"]).show()

ValueError: value should be a float, int, string, bool or dict

In [None]:
df2.write.mode("overwrite").partitionBy("created_date").csv("/home/rafael_ignaulin/Desktop/COMPASSO/Sprint_5/week_09/export")