# Índice
* [Scraping](../scraper/Web-scraping.ipynb)
* [Limpieza y Feature engineering](#Limpieza-y-Feature-engineering)
* [Modelado](../modelado/Modelado-y-visualizaciones.ipynb)

# Limpieza y Feature engineering

## Objetivo
**Limpiar, ordenar y/o seleccionar datos no estructurados a.k.a. JSONs**

- Tenemos dos días de datos de Twitter. 
- Supongamos que sólo queremos algunos campos.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("Fintech")\
        .getOrCreate()

In [None]:
df = spark.read.json('./tweets.json')

In [None]:
df.printSchema()

In [None]:
df.show()

In [None]:
df.select('geo').show()

In [None]:
df.select('coordinates').show()

### geo vs coordinate


![campo coordinate en twitter](./imgs/campo_coordinates.png "Twitter Coordinate")

![campo geo en twitter](./imgs/campo_geo.png "Twitter Geo")
--- [Introduction to tweet JSON. In Twitter Documentation. Recuperado: 20:45, 2018, Mayo 1](https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object)


In [None]:
df.select('geo').where('geo is not null').count()

In [None]:
df.select('geo').where('geo is null').count()

In [None]:
df.select('geo').where('coordinates is not null').count()

In [None]:
df.select('geo').where('coordinates is null').count()

In [None]:
df.select('id','geo','coordinates').where('coordinates is not null').limit(1).show()

In [None]:
df.select('id','geo','coordinates') \
    .where('coordinates is not null') \
    .printSchema()

In [None]:
from pyspark.sql.functions import explode, col

In [None]:
df.select('id','geo.coordinates') \
    .where('coordinates is not null') \
    .limit(1).show(truncate=False)

In [None]:
df.select(
        'id',
        col('geo.coordinates').alias('geo_coordinates'),
        col('coordinates.coordinates').alias('coordinates_coordinates')
    ) \
    .where('coordinates is not null') \
    .limit(1).show(truncate=False)

In [None]:
df.select(
        'id',
        explode(col('geo.coordinates')).alias('geo_coordinates'),
        col('coordinates.coordinates').alias('coordinates_coordinates')
    ) \
    .where('coordinates is not null') \
    .limit(2).show(truncate=False)

In [None]:
df.select(
        'id',
        explode(col('geo.coordinates')).alias('geo_coordinates'),
        col('coordinates.coordinates').alias('coordinates_coordinates')
    ) \
    .where('coordinates is not null') \
    .select(
        "id",
        "geo_coordinates",
        explode(col('coordinates_coordinates'))
    ).show(truncate=False)

In [None]:
df.select(
        'id',
        explode(col('geo.coordinates')).alias('geo_coordinates'),
        col('coordinates.coordinates').alias('coordinates_coordinates')
    ) \
    .where('coordinates is not null') \
    .select(
        "id",
        "geo_coordinates",
        explode(col('coordinates_coordinates'))
    ).count()

In [None]:
df.select(
        'id',
        col('geo.coordinates').alias('geo_coordinates'),
        col('coordinates.coordinates').alias('coordinates_coordinates')
    ) \
    .where('coordinates is not null') \
    .count()

In [None]:
8565*2*2

<h3>Estado inicial:</h3>

|id                |geo_coordinates|coordinates_coordinates         |
|------------------|---------------|------------|
|994365883396116481|[20.850748,-86.87462189]|[-86.87462189,20.850748]|


<h3>Se transpone el vector de la primer columna. El segundo se mantiene constante:</h3>

|id                |geo_coordinates|coordinates_coordinates         |
|------------------|---------------|------------|
|994365883396116481|20.850748      |[-86.87462189,20.850748]|
|994365883396116481|-86.87462189   |[-86.87462189,20.850748]|


<h3>Se transpone el segundo vector:</h3>

|id                |geo_coordinates|col         |
|------------------|---------------|------------|
|994365883396116481|20.850748      |-86.87462189|
|994365883396116481|20.850748      |20.850748   |
|994365883396116481|-86.87462189   |-86.87462189|
|994365883396116481|-86.87462189   |20.850748   |

In [None]:
df.select(
        'id',
        col('geo.coordinates').alias('geo_coordinates')
    ) \
    .where('coordinates is not null') \
    .show()

In [None]:
df.select(
        'id',
        col('geo.coordinates').alias('geo_coordinates')
    ) \
    .where('coordinates is not null') \
    .select(
        "id",
        col('geo_coordinates').getItem(0),
        col('geo_coordinates').getItem(1)
    ).show()

In [None]:
df.select(
        'id',
        col('geo.coordinates').alias('geo_coordinates')
    ) \
    .where('coordinates is not null') \
    .select(
        "id",
        col('geo_coordinates').getItem(0).alias('geo_coordinate_lat'),
        col('geo_coordinates').getItem(1).alias('geo_coordinate_long')
    ).show()

In [None]:
df.select(
        'id',
        col('geo.coordinates').alias('geo_coordinates')
    ) \
    .where('coordinates is not null') \
    .select(
        "id",
        col('geo_coordinates').getItem(0).alias('geo_coordinate_lat'),
        col('geo_coordinates').getItem(1).alias('geo_coordinate_long')
    ).write.mode('overwrite').csv(
        "./coordenadas_twitter",
        sep="|",
        header=True)

In [None]:
df.select(
        'id',
        col('geo.coordinates').alias('geo_coordinates')
    ) \
    .where('coordinates is not null') \
    .select(
        "id",
        col('geo_coordinates').getItem(0).alias('geo_coordinate_lat'),
        col('geo_coordinates').getItem(1).alias('geo_coordinate_long')
    ).coalesce(1).write.mode('overwrite').csv(
        "./coordenadas_twitter_en_uno",
        sep="|",
        header=True)

## Formatos columnares

In [None]:
! ls -lh tweets.json

In [None]:
df.write.mode('overwrite').parquet("tweets_json_parquet")

In [None]:
! du -h tweets_json_parquet

In [None]:
df = spark.read.parquet('tweets_json_parquet')

In [None]:
df.count()

In [None]:
df.select(
        'id',
        col('geo.coordinates').alias('geo_coordinates')
    ) \
    .where('coordinates is not null') \
    .select(
        "id",
        col('geo_coordinates').getItem(0).alias('geo_coordinate_lat'),
        col('geo_coordinates').getItem(1).alias('geo_coordinate_long')
    ).show()

In [None]:
# Podemos registrar la tabla para usar comandos en formato SQL
df.registerTempTable('tweets')

In [None]:
spark.sql("describe tweets").show()

In [None]:
spark.sql("select id,favorite_count " 
          "from tweets "
         "where favorite_count > 0 ").show()

In [None]:
spark.sql("select id,is_quote_status " 
          "from tweets "
         "where is_quote_status=1 ").show()

In [None]:
spark.sql("select is_quote_status,count(*) " 
          "from tweets "
         "group by is_quote_status").show()

In [None]:
spark.sql("select id,count(*) " 
          "from tweets "
         "group by id "
         "having count(*)>1").show()

In [None]:
spark.sql("select id,count(*) " 
          "from tweets "
         "group by id "
         "having count(*)>1").count()

In [None]:
spark.sql("describe tweets").show()

In [None]:
spark.sql("select entities from tweets").printSchema()

In [None]:
spark.sql("select entities.user_mentions from tweets").printSchema()

In [None]:
spark.sql("select entities.user_mentions from tweets").show()

In [None]:
from pyspark.sql.functions import size

In [None]:
spark.sql("select size(entities.user_mentions) from tweets").show()

In [None]:
spark.sql("select size(entities.user_mentions),count(*) "
          "from tweets "
          "group by size(entities.user_mentions) ").show()

In [None]:
spark.sql("select * "
          "from tweets "
          "where size(entities.user_mentions) >= 10").count()

In [None]:
spark.sql("select entities.user_mentions "
          "from tweets "
          "where size(entities.user_mentions) > 10").show()

In [None]:
spark.sql("select entities.user_mentions "
          "from tweets "
          "where size(entities.user_mentions) > 10").printSchema()

In [None]:
spark.sql("select entities.user_mentions[0] "
          "from tweets "
          "where size(entities.user_mentions) > 10").printSchema()

In [None]:
spark.sql("select entities.user_mentions[0].id, "
          "entities.user_mentions[0].name, "
          "entities.user_mentions[0].screen_name "
          "from tweets "
          "where size(entities.user_mentions) > 10").show()

In [None]:
spark.sql("select entities.user_mentions[0].id as id, "
          "entities.user_mentions[0].name as name, "
          "entities.user_mentions[0].screen_name as screen_name "
          "from tweets "
          "where size(entities.user_mentions) > 10").show()

In [None]:
spark.sql("select id,explode(entities.user_mentions) as user_mentions "
          "from tweets ").show()

In [None]:
spark.sql("select id,user_mentions.name,user_mentions.screen_name "
          "from (select id,explode(entities.user_mentions) as user_mentions "
          "from tweets) as tablon").show()

In [None]:
spark.sql("select id,user_mentions.id as mentions_id,user_mentions.name,user_mentions.screen_name "
          "from (select id,explode(entities.user_mentions) as user_mentions "
          "from tweets) as tablon").count()

In [None]:
spark.sql("select id,user_mentions.id as mentions_id,user_mentions.name,user_mentions.screen_name "
          "from (select id,explode(entities.user_mentions) as user_mentions "
          "from tweets) as tablon").distinct().count()

In [None]:
df.count()