In [1]:
import pyspark as ps
import json
import datetime
import numpy as np
from pyspark.sql.types import *

In [2]:
spark = (ps.sql.SparkSession.builder 
        .master("local[4]") 
        .appName("sparkSQL exercise") 
        .getOrCreate()
        )
sc = spark.sparkContext

In [3]:
def convert_to_dict(row):
    try:
        return json.loads(row)
    except:
        return None

def parse_tweet(tweet_dict):
    
    create_date = datetime.datetime.strptime(tweet_dict['created_at'], '%a %b %d %H:%M:%S %z %Y')
    text = tweet_dict['text']
    user_id = int(tweet_dict['user']['id'])
    followers = int(tweet_dict['user']['followers_count'])
    verified = bool(tweet_dict['user']['verified'])
    place_type = tweet_dict['place']['place_type']
    place_name = tweet_dict['place']['name']
    place_full_name = tweet_dict['place']['full_name']
    country_code = tweet_dict['place']['country_code']
    country = tweet_dict['place']['country']
    retweets = int(tweet_dict['retweet_count'])
    favorites = int(tweet_dict['favorite_count'])
    lang = tweet_dict['lang']
    
    return (user_id, followers, verified, text,
            create_date.year, create_date.month,
            create_date.day, create_date.hour,
            create_date.minute, retweets, 
            favorites, lang, place_type, 
            place_name, place_full_name, country_code, country)
    

In [4]:
rdd = sc.textFile('data/french_tweets.json')

In [5]:
filtered_rdd = rdd.map(convert_to_dict)#.map(parse_tweet)

In [6]:
filtered_rdd.take(1)

[{'created_at': 'Wed Apr 26 13:30:45 +0000 2017',
  'id': 857225437122097152,
  'id_str': '857225437122097152',
  'text': 'Je comprends pas trop la fin de 13 reasons why 😓',
  'source': '<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>',
  'truncated': False,
  'in_reply_to_status_id': None,
  'in_reply_to_status_id_str': None,
  'in_reply_to_user_id': None,
  'in_reply_to_user_id_str': None,
  'in_reply_to_screen_name': None,
  'user': {'id': 3137428743,
   'id_str': '3137428743',
   'name': 'TAÏNA♡',
   'screen_name': 'tainabmn',
   'location': None,
   'url': None,
   'description': None,
   'protected': False,
   'verified': False,
   'followers_count': 318,
   'friends_count': 406,
   'listed_count': 2,
   'favourites_count': 777,
   'statuses_count': 3251,
   'created_at': 'Sun Apr 05 16:03:00 +0000 2015',
   'utc_offset': None,
   'time_zone': None,
   'geo_enabled': True,
   'lang': 'fr',
   'contributors_enabled': False,
   'is_translator': Fa

In [17]:
schema = StructType([
    StructField('user_id', StringType(), True),
    StructField('followers', IntegerType(), True),
    StructField('verified', BooleanType(), True),
    StructField('text', StringType(), True),
    StructField('year', IntegerType(), True),
    StructField('month', IntegerType(), True),
    StructField('day', IntegerType(), True),
    StructField('hour', IntegerType(), True),
    StructField('minute', IntegerType(), True),
    StructField('retweets', IntegerType(), True),
    StructField('favorites', IntegerType(), True),
    StructField('lang', StringType(), True),
    StructField('place_type', StringType(), True),
    StructField('place_name', StringType(), True),
    StructField('place_full_name', StringType(), True),
    StructField('country_code', StringType(), True),
    StructField('country', StringType(), True),
])

user_id -> int 
followers -> int
verified -> bool
text -> string
create_date.year -> int, 
create_date.month -> int,
create_date.day -> int, 
create_date.hour -> int,
create_date.minute -> int,
retweets, 
favorites, 
lang, 
place_type, 
place_name, 
place_full_name, 
country_code, country,


In [8]:
filtered_df = spark.createDataFrame(filtered_rdd, schema)

In [9]:
filtered_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- followers: integer (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- text: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- retweets: integer (nullable = true)
 |-- favorites: integer (nullable = true)
 |-- lang: string (nullable = true)
 |-- place_type: string (nullable = true)
 |-- place_name: string (nullable = true)
 |-- place_full_name: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)



In [10]:
filtered_df.show(5)

+------------------+---------+--------+--------------------+----+-----+---+----+------+--------+---------+----+----------+----------+---------------+------------+-------+
|           user_id|followers|verified|                text|year|month|day|hour|minute|retweets|favorites|lang|place_type|place_name|place_full_name|country_code|country|
+------------------+---------+--------+--------------------+----+-----+---+----+------+--------+---------+----+----------+----------+---------------+------------+-------+
|        3137428743|      318|   false|Je comprends pas ...|2017|    4| 26|  13|    30|       0|        0|  fr|      city|    Dozulé| Dozulé, France|          FR| France|
|        2586505687|      173|   false|@julesbl99 travai...|2017|    4| 26|  13|    30|       0|        0|  fr|      city|   Orvault|Orvault, France|          FR| France|
|805805743316357120|        5|   false|@lecho_fr @lasauc...|2017|    4| 26|  13|    30|       0|        0|  fr|      city|      Lucé|   Lucé, Fra