In [1]:
import pyspark as ps
import json
spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("df case study") \
            .getOrCreate()
sc = spark.sparkContext


In [2]:

import pandas as pd 
import numpy as np 
import scipy.stats as stats
import matplotlib.pyplot as plt 
from pyspark.sql.types import *

In [3]:
tweets_df = spark.read.json('../data/french_tweets.json').sample(False, 0.2)

In [None]:
tweets_df.show(1)

In [4]:
tweets_df.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |

In [5]:
tweet_pd = tweets_df.toPandas()

In [None]:
tweet_pd.head()

In [None]:
tweet_pd.info()

In [None]:
tweets_df.take(1)

In [12]:
tweets_df.createOrReplaceTempView('tweets_df')
result = spark.sql("""SELECT geo.coordinates
FROM tweets_df LATERAL VIEW explode(entities.hashtags.text) AS hashtag

 """).collect()
result[:10]

[Row(coordinates=[48.89007918, 2.42839066]),
 Row(coordinates=[48.89007918, 2.42839066]),
 Row(coordinates=[48.89007918, 2.42839066]),
 Row(coordinates=[48.89007918, 2.42839066]),
 Row(coordinates=[48.89007918, 2.42839066]),
 Row(coordinates=[48.89007918, 2.42839066]),
 Row(coordinates=[48.89007918, 2.42839066]),
 Row(coordinates=[49.598666, 6.1330168]),
 Row(coordinates=[49.598666, 6.1330168]),
 Row(coordinates=None)]

In [7]:
tweets_df.createOrReplaceTempView('tweets_df')
result = spark.sql("""SELECT entities.hashtags.text, user.name, place.country, retweet_count, text
FROM tweets_df LATERAL VIEW explode(entities.hashtags.text) AS hashtag
WHERE hashtag is not null
 """).collect()
result[:10]

[Row(text=['AllEyezOnIt', 'Neochrome', 'Eriah', 'Loin', 'Youtube', 'YoutubeNeochrome', 'RepDom'], name='AllEyezOnIt', country='France', retweet_count=0, text='#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube #YoutubeNeochrome #RepDom cc @neochromeprod… https://t.co/UymjEOk2Py'),
 Row(text=['AllEyezOnIt', 'Neochrome', 'Eriah', 'Loin', 'Youtube', 'YoutubeNeochrome', 'RepDom'], name='AllEyezOnIt', country='France', retweet_count=0, text='#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube #YoutubeNeochrome #RepDom cc @neochromeprod… https://t.co/UymjEOk2Py'),
 Row(text=['AllEyezOnIt', 'Neochrome', 'Eriah', 'Loin', 'Youtube', 'YoutubeNeochrome', 'RepDom'], name='AllEyezOnIt', country='France', retweet_count=0, text='#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube #YoutubeNeochrome #RepDom cc @neochromeprod… https://t.co/UymjEOk2Py'),
 Row(text=['AllEyezOnIt', 'Neochrome', 'Eriah', 'Loin', 'Youtube', 'YoutubeNeochrome', 'RepDom'], name='AllEyezOnIt', country='France', retweet_count=0, text='#AllEyezOn

In [None]:
tweets_rdd = sc.textFile('../data/french_tweets.json').sample(False, 0.2)


In [10]:
# This takes a query result and puts it in a simpler 2D table.
result = spark.sql("""SELECT id,
   created_at,
   retweet_count,
   text,
   user.id,
   user.name
FROM tweets_df LATERAL VIEW explode(entities.hashtags.text) AS hashtag
WHERE hashtag is not null
LIMIT 10
 """).collect()
# result
# type(result)
tweet_schema = StructType( [
    StructField('id',StringType(),True),
    StructField('created_at', StringType(), True),
    StructField('retweet_count',StringType(),True),
    StructField('text',StringType(),True),
    StructField('userid', StringType(), True),
    StructField('username', StringType(), True)
])
# feed that into a DataFrame
#df1 = schema.createDataFrame(rdd_sales,ht_schema)
df1 = spark.createDataFrame(data=result,schema=tweet_schema)
# show the result
df1.show()

+------------------+--------------------+-------------+--------------------+----------+-------------------+
|                id|          created_at|retweet_count|                text|    userid|           username|
+------------------+--------------------+-------------+--------------------+----------+-------------------+
|857225447821725696|Wed Apr 26 13:30:...|            0|#AllEyezOnIt #Neo...| 917945400|        AllEyezOnIt|
|857225447821725696|Wed Apr 26 13:30:...|            0|#AllEyezOnIt #Neo...| 917945400|        AllEyezOnIt|
|857225447821725696|Wed Apr 26 13:30:...|            0|#AllEyezOnIt #Neo...| 917945400|        AllEyezOnIt|
|857225447821725696|Wed Apr 26 13:30:...|            0|#AllEyezOnIt #Neo...| 917945400|        AllEyezOnIt|
|857225447821725696|Wed Apr 26 13:30:...|            0|#AllEyezOnIt #Neo...| 917945400|        AllEyezOnIt|
|857225447821725696|Wed Apr 26 13:30:...|            0|#AllEyezOnIt #Neo...| 917945400|        AllEyezOnIt|
|857225447821725696|Wed Apr 

In [45]:
tweets_df.createOrReplaceTempView('tweets_df')
geo_results = spark.sql("""SELECT id, geo.coordinates[0] as long, geo.coordinates[1] as lat
    FROM tweets_df LATERAL VIEW explode(entities.hashtags.text) AS hashtag
    WHERE hashtag is not null 
    """).collect()
geo_schema = StructType( [
    StructField('Tweet_ID', StringType(), True),
    StructField('Longitude',FloatType(),True),
    StructField('Lattitude',FloatType(),True)] )
#create cleaned location dataframe
geo_df = spark.createDataFrame(data=geo_results, schema=geo_schema)

In [46]:
geo_df.show(10)

+------------------+---------+---------+
|          Tweet_ID|Longitude|Lattitude|
+------------------+---------+---------+
|857225447821725696| 48.89008|2.4283907|
|857225447821725696| 48.89008|2.4283907|
|857225447821725696| 48.89008|2.4283907|
|857225447821725696| 48.89008|2.4283907|
|857225447821725696| 48.89008|2.4283907|
|857225447821725696| 48.89008|2.4283907|
|857225447821725696| 48.89008|2.4283907|
|857225518504189953|49.598667|6.1330166|
|857225518504189953|49.598667|6.1330166|
|857225530122350592|     null|     null|
+------------------+---------+---------+
only showing top 10 rows



In [51]:
tweets_df.createOrReplaceTempView('tweets_df')
ht_results = result = spark.sql("""SELECT id, hashtag
    FROM tweets_df LATERAL VIEW explode(entities.hashtags.text) AS hashtag
    WHERE hashtag is not null
    """).collect()
ht_schema = StructType( [
    StructField('Tweet_ID', StringType(), True),
    StructField('Hashtag',StringType(),True)] )
#create cleaned location dataframe
ht_df = spark.createDataFrame(data=ht_results, schema=ht_schema)

In [35]:
ht_df.show(10)

+------------------+----------------+
|          Tweet_ID|         Hashtag|
+------------------+----------------+
|857225447821725696|     AllEyezOnIt|
|857225447821725696|       Neochrome|
|857225447821725696|           Eriah|
|857225447821725696|            Loin|
|857225447821725696|         Youtube|
|857225447821725696|YoutubeNeochrome|
|857225447821725696|          RepDom|
|857225518504189953|          IC2113|
|857225518504189953|           CFL50|
|857225530122350592|       Whirlpool|
+------------------+----------------+
only showing top 10 rows



In [53]:
               
result = spark.sql("""
                    SELECT id, created_at, user.name, text, lang, retweet_count
                    FROM tweets_df LATERAL VIEW explode(entities.hashtags.text) AS hashtag
                    WHERE hashtag is not null
                    LIMIT 10
                    """).collect()
tweet_schema = StructType( [
    StructField('Tweet_ID', StringType(), True),
    StructField('Created_At', StringType(), True),
    StructField('Username',StringType(),True),
    StructField('Tweet_Text', StringType(), True),
    StructField('Tweet_Language', StringType(), True),
    StructField('Retweet_Count', IntegerType(), True)] )
#create cleaned tweet dataframe
clean_tweet_df = spark.createDataFrame(result, tweet_schema)

In [60]:
def text_all_cleaned(x):
    x = x.lower()
    x = ' '.join([word for word in x.split(' ') if word not in stop_words])
    x = x.encode('ascii', 'ignore').decode()
    x = re.sub(r'https*\S+', ' ', x)
    x = re.sub(r'@\S+', ' ', x)
    x = re.sub(r'#\S+', ' ', x)
    x = re.sub(r'\'\w+', '', x)
    x = re.sub('[%s]' % re.escape(string.punctuation), ' ', x)
    x = re.sub(r'\w*\d+\w*', '', x)
    x= re.sub(r'\s{2,}', ' ', x)
    return x

In [62]:
clean_tweet_df.map(lambda x:  text_all_cleaned(x))

AttributeError: 'DataFrame' object has no attribute 'map'

In [58]:
pd_t = clean_tweet_df.toPandas().head(10)
pd_t.head()

Unnamed: 0,Tweet_ID,Created_At,Username,Tweet_Text,Tweet_Language,Retweet_Count
0,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0
1,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0
2,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0
3,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0
4,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0


In [78]:
import re
import nltk
import string
from nltk.corpus import stopwords
stopwords = stopwords

In [80]:
def text_all_cleaned(x):
    x = x.lower()
    x = ' '.join([word for word in x.split(' ') if word not in stopwords])
    x = x.encode('ascii', 'ignore').decode()
    x = re.sub(r'https*\S+', ' ', x)
    x = re.sub(r'@\S+', ' ', x)
    x = re.sub(r'#\S+', ' ', x)
    x = re.sub(r'\'\w+', '', x)
    x = re.sub('[%s]' % re.escape(string.punctuation), ' ', x)
    x = re.sub(r'\w*\d+\w*', '', x)
    x= re.sub(r'\s{2,}', ' ', x)
    return x


In [73]:
pd_t.head(10)

Unnamed: 0,Tweet_ID,Created_At,Username,Tweet_Text,Tweet_Language,Retweet_Count
0,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0
1,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0
2,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0
3,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0
4,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0
5,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0
6,857225447821725696,Wed Apr 26 13:30:47 +0000 2017,AllEyezOnIt,#AllEyezOnIt #Neochrome #Eriah #Loin #Youtube ...,und,0
7,857225518504189953,Wed Apr 26 13:31:04 +0000 2017,Train Travel Alerts,🚂 #IC2113 BRUXELLES-MIDI (Belgium)-LUX\nDepart...,fr,0
8,857225518504189953,Wed Apr 26 13:31:04 +0000 2017,Train Travel Alerts,🚂 #IC2113 BRUXELLES-MIDI (Belgium)-LUX\nDepart...,fr,0
9,857225530122350592,Wed Apr 26 13:31:07 +0000 2017,Mathieu Magnaudeix,"A #Whirlpool #Amiens, Katia Dubois, déléguée d...",fr,0
