# ETL - Preprocesado

Utiliza Spark para procesar un dataset de Tweets en el formato original y responde a las preguntas. 

El archivo original que contiene los tweets contiene una carpeta por cada día, que a su vez contiene una carpeta por cada hora. Dentro de estas hay un archivo comprimido bz2 por cada minuto, que contiene un archivo JSON con los tweets en todos los idiomas enviados en ese intervalo. En la carpeta /files tienes los tweets del día 30/9/2011 en este formato (solo una hora, para ahorrar espacio).

Utiliza Spark para extraer los tweets, interpretar el JSON y formatear los tweets en el formato adecuado. 
Deberás crear un array que contenga las rutas a los distintos archivos y transformarlo en un RDD con el que trabajar. 
Después deberás escribir una función con la que mapear una ruta de archivo a un listado de tuplas de la forma (**usuario, fecha, contenido_del_tweet**). A partir de ahí podrás manejar el RDD de la misma manera que en la práctica 2. 
Puedes coger ideas de este script que lee todos los archivos de un directorio y los descomprime del formato bz2.

In [2]:
import sys, os, bz2, json
from pyspark import SparkContext

tweets = []
json_files_paths = []
path = './tweets/30/01/'

def has_required_properties(tweet):
    return 'user' in tweet and 'created_at' in tweet and 'text' in tweet

for(dirpath,dirnames,files) in os.walk(path):
    for filename in files: 
        filepath = os.path.join(dirpath, filename) 
        json_file_path = filepath[:-4]        
        
        zipfile = bz2.BZ2File(filepath)
        data = zipfile.read()        
        open(json_file_path, 'wb').write(data)
        json_files_paths.append(json_file_path)

        with open(json_file_path, 'r') as json_stream:
            file_line = json_stream.readline()
            while file_line:
                json_parsed = json.loads(file_line)
                if has_required_properties(json_parsed):
                    tweets.append(json_parsed)
                file_line = json_stream.readline()

['47.json.bz2', '44.json.bz2', '02.json.bz2', '21.json.bz2', '45.json.bz2', '43.json.bz2', '13.json.bz2', '07.json.bz2', '40.json.bz2', '31.json.bz2', '58.json.bz2', '10.json.bz2', '38.json.bz2', '23.json.bz2', '37.json.bz2', '30.json.bz2', '35.json.bz2', '59.json.bz2', '05.json.bz2', '26.json.bz2', '17.json.bz2', '52.json.bz2', '49.json.bz2', '39.json.bz2', '04.json.bz2', '55.json.bz2', '56.json.bz2', '48.json.bz2', '00.json.bz2', '08.json.bz2', '20.json.bz2', '42.json.bz2', '24.json.bz2', '28.json.bz2', '18.json.bz2', '25.json.bz2', '33.json.bz2', '36.json.bz2', '46.json.bz2', '41.json.bz2', '09.json.bz2', '16.json.bz2', '34.json.bz2', '57.json.bz2', '51.json.bz2', '32.json.bz2', '54.json.bz2', '14.json.bz2', '50.json.bz2', '01.json.bz2', '11.json.bz2', '06.json.bz2', '22.json.bz2', '27.json.bz2', '19.json.bz2', '53.json.bz2', '15.json.bz2', '03.json.bz2', '29.json.bz2', '12.json.bz2']


In [400]:
# Remove json files created
for json_path in json_files_paths:
    os.remove(json_path)

In [3]:
len(tweets)

46562

In [4]:
# Tweet structure sample
print json.dumps(tweets[0], indent=4, sort_keys=True)

{
    "contributors": null, 
    "coordinates": null, 
    "created_at": "Fri Sep 30 07:47:00 +0000 2011", 
    "entities": {
        "hashtags": [], 
        "urls": [], 
        "user_mentions": []
    }, 
    "favorited": false, 
    "geo": null, 
    "id": 119679624430104576, 
    "id_str": "119679624430104576", 
    "in_reply_to_screen_name": null, 
    "in_reply_to_status_id": null, 
    "in_reply_to_status_id_str": null, 
    "in_reply_to_user_id": null, 
    "in_reply_to_user_id_str": null, 
    "place": null, 
    "retweet_count": 0, 
    "retweeted": false, 
    "source": "<a href=\"http://ubersocial.com\" rel=\"nofollow\">UberSocial for Android</a>", 
    "text": "I think I mite sleep my whole day tomorrow I owe it to my body n mind", 
    "truncated": false, 
    "user": {
        "contributors_enabled": false, 
        "created_at": "Sun May 10 00:45:21 +0000 2009", 
        "default_profile": false, 
        "default_profile_image": false, 
        "description": "MC,PROD

In [5]:
# Create context and load tweets

sc = SparkContext("local", "Tutorial")
rdd_tweets = sc.parallelize(tweets)
type(rdd_tweets)

pyspark.rdd.RDD

In [376]:
# Filter to get only tweets with language 'es'

def is_es_language(tweet):
    return tweet['user']['lang'] == 'es'

def format_tweet(tweet):
    return { 
        'user': tweet['user']['screen_name'],
        'text': tweet['text'], 
        'timestamp': tweet['created_at']
    }

rdd_tweets_es = (rdd_tweets
                 .filter(is_es_language) # Get tweets with language 'es'
                 .map(format_tweet)
                )


rdd_tweets_es.persist()
rdd_tweets_es.count()

2137

In [378]:
# Formatted tweet sample
print json.dumps(rdd_tweets_es.take(10), indent=4, sort_keys=True)

[
    {
        "text": "@mariascc gracias cejuuuu!!! ^^", 
        "timestamp": "Fri Sep 30 07:47:00 +0000 2011", 
        "user": "USK_123Ska"
    }, 
    {
        "text": "ma\u00f1ana viernes pesado todo mundo quiere salir de viaje y asi", 
        "timestamp": "Fri Sep 30 07:47:00 +0000 2011", 
        "user": "tiiimy"
    }, 
    {
        "text": "@Monjeton @ot_panini_luna sale que descanses :)", 
        "timestamp": "Fri Sep 30 07:47:01 +0000 2011", 
        "user": "becan05"
    }, 
    {
        "text": "@CacaitoLZF no te amargues por lo que no tiene remedio. Si no la olvidas, hazle creer que no la recuerdas.", 
        "timestamp": "Fri Sep 30 07:47:02 +0000 2011", 
        "user": "magde27"
    }, 
    {
        "text": "recien vuelvo de Rumi, Niceto y Esperanto, la juventud/generaci\u00f3n esta perdida....", 
        "timestamp": "Fri Sep 30 07:47:07 +0000 2011", 
        "user": "NachoRockr"
    }, 
    {
        "text": "@MarinaStriebeck. Jaja toppp modeelll http://t.co

## Part 1

In [379]:
# Sort users by tweets made

total_tweets_by_user = (rdd_tweets_es
                        .map(lambda tweet: (tweet['user'], 1)) # Tuples (user, 1)
                        .reduceByKey(lambda value1, value2: value1 + value2) # Group by key (user) and sum #value
                        .sortBy(lambda tweet: tweet[1], ascending = False) # Sort by #value
                       )

In [380]:
# Top 10 users with more tweets
total_tweets_by_user.take(10)

[(u'AnnhaGarza', 4),
 (u'berbe', 3),
 (u'SamiiCruz', 3),
 (u'Guille_Mi', 3),
 (u'COGAM', 3),
 (u'omerta_666', 3),
 (u'Shushuna_Rangel', 3),
 (u'LuisFer_05', 3),
 (u'issixx', 3),
 (u'ppaolart', 3)]

## Part 2

In [381]:
# Sort by most recurrent words inside tweets' text

def get_text_splited_by_word(tweet):
    return tweet['text'].split()

tweets_words_sorted = (rdd_tweets_es
                       .flatMap(get_text_splited_by_word) # Make list with words from each tweet
                       .map(lambda word: (word, 1)) # Tuples (word, 1)
                       .reduceByKey(lambda value1, value2: value1 + value2) # Group by word and sum #value
                       .sortBy(lambda _tuple: _tuple[1], ascending=False) # Sort by #value
                      )

In [382]:
# Top common words inside tweets
tweets_words_sorted.take(10)

[(u'de', 714),
 (u'a', 535),
 (u'que', 508),
 (u'la', 482),
 (u'y', 397),
 (u'en', 363),
 (u'el', 354),
 (u'RT', 328),
 (u'no', 312),
 (u'me', 278)]

## Part 3

In [383]:
# Sort by most recurrent words inside tweets' text without most recurrent words inside the tweets (not in stopwords)

stopwords = ['como', 'pero', 'o', 'al', 'mas', 'esta', 'le', 'cuando', 'eso', 'su', 'porque',\
             'd', 'del', 'los', 'mi', 'si', 'las', 'una', 'q', 'ya', 'yo', 'tu', 'el', 'ella',\
             'a', 'ante', 'bajo', 'cabe', 'con', 'contra', 'de', 'desde', 'en', 'entre', 'hacia',\
             'hasta', 'para', 'por', 'segun', 'sin', 'so', 'sobre', 'tras', 'que', 'la', 'no', 'y',\
             'el', 'me', 'es', 'te', 'se', 'un', 'lo']

def is_not_stopword(item):
    return item[0] not in stopwords

tweets_words_sorted.persist()
tweets_words_sorted_filtered = (tweets_words_sorted
                                .filter(is_not_stopword) # Remove stopwords from sorted tuples (word, #value)
                                .sortBy(lambda _tuple: _tuple[1], ascending=False) # Sort by #value
                               )

In [384]:
# Top common words inside tweets (not in stopwords)
tweets_words_sorted_filtered.take(10)

[(u'RT', 328),
 (u'...', 83),
 (u'El', 63),
 (u'No', 53),
 (u'-', 52),
 (u':)', 50),
 (u'A', 49),
 (u'La', 49),
 (u'Y', 46),
 (u'ver', 41)]

## Part 4

In [387]:
# Get scope (number of apperances) by hashtag

def get_hashtags(tweet):
    hashtags=tweet['text'].split('#')[1:]
    return map(lambda x: x if ' ' not in x.lower() else x.partition(' ')[0].lower(), hashtags)

def has_elements(item):
    return len(item) > 0

hashtags_sorted = (rdd_tweets_es
                   .flatMap(get_hashtags) # Extract list of hashtag from text
                   .map(lambda hashtag: (hashtag, 1)) # Tuples (hashtag, 1)
                   .reduceByKey(lambda value1, value2: value1 + value2) # Group by hashtag and compute #value
                   .sortBy(lambda _tuple: _tuple[1], ascending=False) # Sort by #value
                  )

hashtags_sorted.persist()
hashtags_sorted_list = hashtags_sorted.collect()
hashtags_sorted_list

[(u'ff', 44),
 (u'cuandomedrogo', 8),
 (u'fb', 6),
 (u'np', 5),
 (u'pinchesnombresculeros', 4),
 (u'followback', 4),
 (u'comiquitasdelainfancia', 3),
 (u'ches', 3),
 (u'trabajo', 3),
 (u'latinoamericac13', 3),
 (u'twitteroff', 3),
 (u'siguemeytesigo', 3),
 (u'twitter', 3),
 (u'nowplaying', 3),
 (u'estanochequiero', 2),
 (u'panama', 2),
 (u'follow1x1', 2),
 (u'talibanazoxcrisuentv', 2),
 (u'followrapido', 2),
 (u'oseaguats', 2),
 (u'cchsur', 2),
 (u'TalibanazoxCrisUenTv', 2),
 (u'empleo', 2),
 (u'fail', 2),
 (u'economia', 2),
 (u'sifueramiultimanoche', 2),
 (u'', 1),
 (u'thingsmenshouldnttexteachother', 1),
 (u'wTF', 1),
 (u'dios', 1),
 (u'Two0F', 1),
 (u'ppsoe', 1),
 (u'paris', 1),
 (u'voluntariado', 1),
 (u'CasosDeLaBidaReal"', 1),
 (u'facebook', 1),
 (u'useyourillusioni', 1),
 (u'casa', 1),
 (u'NovemberRain', 1),
 (u'omaigadauran', 1),
 (u'team', 1),
 (u'teamfollowback', 1),
 (u'cchsur,', 1),
 (u'espregunta', 1),
 (u'Carloscanta2407.11FM', 1),
 (u'htcdesireven', 1),
 (u'90s', 1),
 (u

In [388]:
# Sort users by sum of used hashtags' scope

def get_scope(value):
    scope = [_tuple2_[1] for _tuple2_ in hashtags_sorted_list if _tuple2_[0] == value]
    return scope[0]

hashtags_grouped_by_user_sorted = (rdd_tweets_es
                                   .map(lambda tweet: (tweet['user'], get_hashtags(tweet))) # Tuple (user,[hashtag])
                                   .filter(lambda item: has_elements(item[1])) # Remove tuples without hashtags
                                   .flatMapValues(lambda value: value) # Make list of (user, hashtag)
                                   .map(lambda item: (item[0], get_scope(item[1]))) # Tuple (user, scope)
                                   .reduceByKey(lambda v1, v2: v1 + v2) # Group by user and sum #value
                                   .sortBy(lambda item: item[1], ascending=False) # Sort by #value
                                  )

In [389]:
# Top 10 user by scope of used hashtags
hashtags_grouped_by_user_sorted.take(10)

[(u'Perro_de_Cris_U', 46),
 (u'issixx', 46),
 (u'luisdiazdeldedo', 45),
 (u'ibotica', 45),
 (u'mundodelcorazon', 45),
 (u'JcPxnditx91', 45),
 (u'Bouquet_es', 44),
 (u'ElPan_', 44),
 (u'Kev_Aponte', 44),
 (u'AmaaliaGo', 44)]

## Part 5

In [393]:
# Get useless hashtags (scope = 1)
useless_hashtags = map(lambda pair: pair[0], filter(lambda pair: pair[1] == 1,hashtags_sorted_list))
useless_hashtags

[u'',
 u'thingsmenshouldnttexteachother',
 u'wTF',
 u'dios',
 u'Two0F',
 u'ppsoe',
 u'paris',
 u'voluntariado',
 u'CasosDeLaBidaReal"',
 u'facebook',
 u'useyourillusioni',
 u'casa',
 u'NovemberRain',
 u'omaigadauran',
 u'team',
 u'teamfollowback',
 u'cchsur,',
 u'espregunta',
 u'Carloscanta2407.11FM',
 u'htcdesireven',
 u'90s',
 u'cazailegalaves',
 u'nowords',
 u'decoraci\xf3n',
 u'turismo',
 u'c\xe1ncer:',
 u'lgbt',
 u'golfbarapestas',
 u'muerteaplatini',
 u'MtvMomentos.',
 u'alwaysonmymind',
 u'culturapreventiva',
 u'mobilemarketing',
 u'notitec',
 u'4950',
 u'avene',
 u'Pasalo',
 u'FB',
 u'2000s',
 u'FF',
 u'yyomequedecomoqueahok',
 u'iphone',
 u'culturateceriana',
 u'noesdedios!!',
 u'estonoesvida',
 u'landashop?',
 u'Nohayderexo!',
 u'vuelta',
 u'europapress',
 u'hola',
 u'jamas',
 u'dormire',
 u'enelbarrionuncafalta',
 u'yosoyviernes',
 u'noalaborto',
 u'laculpaesdel15m',
 u'INDICO',
 u'PalantMrik',
 u'monumentmessi,',
 u'antoine',
 u'30a\xf1osdeoscuridad',
 u'guias',
 u'hoy',
 u

In [396]:
# Get users sorted by more useless hashtags created

def is_useless_hashtag(item):
    return item[1] in useless_hashtags

useless_hashtags_creators = (rdd_tweets_es
                             .map(lambda tweet: (tweet['user'], get_hashtags(tweet))) # Tuple (user,[hashtag])
                             .filter(lambda item: has_elements(item[1])) # Remove tuples without hashtags
                             .flatMapValues(lambda value: value) # Make list of (user, hashtag)
                             .filter(is_useless_hashtag) # Filter to get only useless hashtags
                             .map(lambda item: (item[0], 1)) # Tuple (hashtag, 1)
                             .reduceByKey(lambda v1, v2: v1 + v2) # Group by user and sum #value
                             .sortBy(lambda item: item[1], ascending = False) # Sort by #value
                            )

In [397]:
# Top 10 creators of useless hashtags
useless_hashtags_creators.take(10)

[(u'bindupowercoach', 7),
 (u'MiRadioTweet', 5),
 (u'mplzoe', 5),
 (u'patriotdarkwolf', 5),
 (u'COGAM', 4),
 (u'fluisvives', 4),
 (u'Hoyreka', 4),
 (u'Tweet_Pub', 4),
 (u'Chica_Vnzla', 4),
 (u'tuitrafico_feed', 4)]

## Part 6

In [398]:
# Users who created hashtags with more scope

def get_creator(pairs):
    pair_as_list_sorted = sorted(list(pairs), key = lambda item: item[1])
    return pair_as_list_sorted[0][0]

def format_tuples(item):
    return (item[1], (item[0]['user'], item[0]['timestamp']))

hashtag_creators_by_scope = (rdd_tweets_es
                             .sortBy(lambda tweet: tweet['timestamp'], ascending=True)  # Sort by timestamp    
                             .map(lambda tweet: (tweet, get_hashtags(tweet))) # Extract hashtags from text
                             .filter(lambda item: has_elements(item[1])) # Remove tuples without hashtags
                             .flatMapValues(lambda value: value) # Tuples (user, hashtag)
                             .map(format_tuples) # Create tuples (hashtag, (user, timestamp))
                             .groupByKey() # Group by hashtag
                             .map(lambda (key, value): (key, get_creator(value))) # Get hashtag creators
                             .map(lambda item: (item[1], get_scope(item[0]))) # Tuples (user, scope)
                             .reduceByKey(lambda v1, v2: v1 + v2) # Compute the accumulated scope from each user
                             .sortBy(lambda item: item[1], ascending=False) # Sort by accumulated scope
                            )

In [399]:
hashtag_creators_by_scope.take(10)

[(u'FabianaMargarit', 44),
 (u'mplzoe', 12),
 (u'WilDonME', 8),
 (u'bindupowercoach', 7),
 (u'cursos_ceticsa', 6),
 (u'AiramMedranoMue', 6),
 (u'sergisbarnosell', 6),
 (u'MiRadioTweet', 5),
 (u'AliciaDinorah', 5),
 (u'KaroCaiaffa', 5)]