In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
import os, shutil, datetime
from collections import Counter
from operator import add

In [None]:
#os.environ["PYSPARK_PYTHON"] = "/usr/bin/env python3"
master = "local[4]"
appName = "phase1"
conf = SparkConf().setAppName(appName).setMaster(master)

In [None]:
sc = SparkContext(conf=conf)

In [None]:
sc.setLogLevel("WARN")

## Creating RDD

In [None]:
rdd = sc.textFile("data/geotweets.tsv")

### Creating a sample RDD for testing

In [None]:
sampled_rdd = rdd.sample(False, 0.1, 5)

### Creating array splitting on tabs

In [None]:
rdd_list = rdd.map(lambda x: x.split('\t'))

In [None]:
print(rdd_list.first())

In [None]:
sampled_rdd_list = sampled_rdd.map(lambda x: x.split('\t'))

In [None]:
new_list = sampled_rdd_list.keyBy(lambda x: x[1])
print(new_list.first())

# Task 1

In [None]:
number_of_tweets = rdd.count()
print(number_of_tweets)

In [None]:
number_of_users = rdd_list.map(lambda x: x[6]).distinct().count()
print(number_of_users)

In [None]:
number_of_countries = rdd_list.map(lambda x: x[1]).distinct().count()
print(number_of_countries)

In [None]:
number_of_places = rdd_list.map(lambda x: x[4]).distinct().count()
print(number_of_places)

In [None]:
#number_of_languages = rdd_list.map(lambda x: (x, 1) ).reduce(lambda a, b: a+b)
number_of_languages = rdd_list.map(lambda x: x[5]).distinct().count()
print(number_of_languages)

In [None]:
minimum_latitude = rdd_list.map(lambda x: float(x[11])).reduce(lambda a, b: min(a,b))
print(minimum_latitude)

In [None]:
minimum_longitude = rdd_list.map(lambda x: float(x[12])).reduce(lambda a, b: min(a,b))
print(minimum_longitude)

In [None]:
maximum_latitude = rdd_list.map(lambda x: float(x[11])).reduce(lambda a, b: max(a,b))
print(maximum_latitude)

In [None]:
maximum_longitude = rdd_list.map(lambda x: float(x[12])).reduce(lambda a, b: max(a,b))
#alsjdk
print(maximum_longitude)

In [None]:
tweet_text = rdd_list.map(lambda x: x[10])
print(tweet_text)

In [None]:
tweet_in_characters = tweet_text.map(lambda x: (len(x)))
average_tweet_in_characters = tweet_in_characters.mean()
print(average_tweet_in_characters)

In [None]:
tweet_in_words = tweet_text.map(lambda x: len(x.split(' ')))
average_tweet_in_words = tweet_in_words.mean()
print(average_tweet_in_words)

### Combining results to RDD and writes to file

In [None]:
results = sc.parallelize([number_of_tweets, number_of_users,\
                        number_of_countries, number_of_places,\
                        number_of_languages, minimum_latitude,\
                        minimum_longitude, maximum_latitude, maximum_longitude,\
                        average_tweet_in_characters, average_tweet_in_words])
results = results.coalesce(1)
resultsPath = 'results/result_1.tsv'
if os.path.isdir(resultsPath):
    shutil.rmtree(resultsPath)
results_tsv = results.saveAsTextFile(resultsPath)

# Task 2

### Creates new RDD by MapReduce, counting number of tweets per country

In [None]:
new_rdd = rdd_list.map(lambda x: (str(x[1]), 1)).countByKey().items()

### Sorts twice. First alphabetically ascending on country name, then numerically descending on number of tweets. We can do this since the sorts are stable, hence the order between records with same key is preserved

In [None]:
sorted_dict = sorted(new_rdd, key=lambda x: x[0])
sorted_dict = sorted(sorted_dict, key=lambda x: x[1], reverse=True)

In [None]:
print(sorted_dict)

### Saving result as RDD

In [None]:
result_task2_rdd = sc.parallelize(sorted_dict)
result_task2 = result_task2_rdd.map(lambda x: '{}\t{}'.format(x[0],x[1]))

### Writing results to text file

In [None]:
resultsPath = 'results/result_2.tsv'
if os.path.isdir(resultsPath):
    shutil.rmtree(resultsPath)
result_task2.coalesce(1).saveAsTextFile(resultsPath)

# Task 3

In [None]:
countries_under_10 = result_task2_rdd.filter(lambda x: x[1] < 11)

In [None]:
countries_with_lat = rdd_list.map(lambda x: (str(x[1]), float(x[11])))
countries_with_lon = rdd_list.map(lambda x: (str(x[1]), float(x[12])))

In [None]:
countries_over_10_with_lat = countries_with_lat.subtractByKey(countries_under_10)
countries_over_10_with_lon = countries_with_lon.subtractByKey(countries_under_10)

In [None]:
def calculateCenter(listWithCoord):
    return sum(listWithCoord)/len(listWithCoord)

In [None]:
country_centroid_lat = countries_over_10_with_lat.groupByKey().\
                    mapValues(list).mapValues(calculateCenter)
country_centroid_lon = countries_over_10_with_lon.groupByKey().\
                    mapValues(list).mapValues(calculateCenter)
country_centroid_rdd = country_centroid_lat.join(country_centroid_lon)

In [None]:
result_task3 = country_centroid_rdd.map(lambda x: '{}\t{}\t{}'.format(x[0], x[1][0], x[1][1]))

In [None]:
resultsPath = 'results/result_3.tsv'
if os.path.isdir(resultsPath):
    shutil.rmtree(resultsPath)
result_task3.coalesce(1).saveAsTextFile(resultsPath)

In [None]:
from cartoframes import Layer, BaseMap, styling
BASEURL = 'https://larshbj.carto.com'
APIKEY = '299d2d825191b9879da6fc859d1064930f28d061'
cc = cartoframes.CartoContext(base_url=BASEURL,
                              api_key=APIKEY)
cc.map(layers=Layer('result_task3_carto_4',
                   size=7),
       interactive=False)

# Task 4

In [None]:
def getLocalTimeHour(timestamp, offset):
    s = timestamp / 1000.0 + offset
    return str(datetime.datetime.fromtimestamp(s).hour)

In [None]:
def getMaxTweetTimeInterval(hour_list):
    result = Counter(hour_list).most_common(1)
    return result[0]

In [None]:
rdd_task4 = rdd_list.map(lambda x: (str(x[1]), getLocalTimeHour(float(x[0]), float(x[8]))))

In [None]:
# This can be made more efficient using reduceByKey
country_time_rdd = rdd_task4.groupByKey().mapValues(lambda x: list(x))\
                    .mapValues(lambda x: getMaxTweetTimeInterval(x))

In [None]:
result_task4 = country_time_rdd.map(lambda x: '{}\t{}\t{}'.format(x[0], x[1][0], x[1][1]))

In [None]:
resultsPath = 'results/result_4.tsv'
if os.path.isdir(resultsPath):
    shutil.rmtree(resultsPath)
result_task4.coalesce(1).saveAsTextFile(resultsPath)

# Task 5

In [None]:
def findNumberOfTweetsAndSort(rdd):
    result = rdd.map(lambda x: (str(x[4]), 1)).countByKey().items()
    result = sorted(result, key=lambda x: x[0])
    return sorted(result, key=lambda x: x[1], reverse=True)

In [None]:
rdd_task5 = rdd_list.filter(lambda x: x[2] == 'US' and x[3] == 'city')
rdd_task5 = findNumberOfTweetsAndSort(rdd_task5)

In [None]:
result_task5_rdd = sc.parallelize(rdd_task5)
result_task5 = result_task5_rdd.map(lambda x: '{}\t{}'.format(x[0],x[1]))

In [None]:
resultsPath = 'results/result_5.tsv'
if os.path.isdir(resultsPath):
    shutil.rmtree(resultsPath)
result_task5.coalesce(1).saveAsTextFile(resultsPath)

# Task 6

In [None]:
stopwords_rdd = sc.textFile("data/stop_words.txt")
stopwords_list = stopwords_rdd.flatMap(lambda x: str(x).split('\n'))

In [None]:
def filterTweetsByLength(rdd):
    return rdd.filter(lambda x: len(x) >= 2)

In [None]:
def filterTweetsByStopwords(rdd, stopwords):
    return rdd.subtract(stopwords)

In [None]:
rdd_task6_tweets = rdd_list.filter(lambda x: x[2] == 'US')\
                    .map(lambda x: str(x[10]))\
                    .flatMap(lambda x: x.split(' '))
print(rdd_task6_tweets.take(5))

In [None]:
task6_freq_words_list = rdd_task6_tweets.filter(lambda x: len(x) >= 2)\
                    .map(lambda x: x.lower())\
                    .subtract(stopwords_list)\
                    .map(lambda x: (x, 1))\
                    .reduceByKey(add)\
                    .collect()

In [None]:
task6_freq_words_list_sorted = sorted(task6_freq_words_list, key=lambda x: x[1], reverse=True)

In [None]:
result_task6 = sc.parallelize(task6_freq_words_list_sorted[0:10])\
                    .map(lambda x: '{}\t{}'.format(x[0], x[1]))

In [None]:
resultsPath = 'results/result_6.tsv'
if os.path.isdir(resultsPath):
    shutil.rmtree(resultsPath)
result_task6.coalesce(1).saveAsTextFile(resultsPath)

# Task 7

In [None]:
five_cities = result_task5_rdd.zipWithIndex()\
                .filter(lambda index: index[1] < 5).keys()
#print(five_cities.collect())
tweet_text = rdd_list.map(lambda x: (x[4], x[10]))

In [None]:
sub = stopwords_list.map(lambda x: (0, x))

tweets_by_city = tweet_text.join(five_cities)\
                        .map(lambda x: (x[0], x[1][0]))\
                        .flatMapValues(lambda x: x.split(' '))\
                        .filter(lambda x: len(x[1]) >= 2)\
                        .map(lambda x: (x[0], x[1].lower()))\
                        .subtract(sub)
#print(tweets_by_city.take(20))


In [None]:
def to_dict(word):
    city = {}
    city[word] = 1
    return city
    
def add(city, word):
    if word in city:
        city[word] += 1
    else:
        city[word] = 1
    return city

def merge(dict1, dict2):
    new = {**dict1, **dict1}
    return new

counted_tweets_by_city = tweets_by_city.combineByKey(to_dict, add, merge)\
        .collect()

In [None]:
import operator
common_words = []
for city in counted_tweets_by_city:
    sorted_words = sorted(city[1].items(), key=operator.itemgetter(1), reverse=True)[0:10]
    c = []
    for word_tuple in sorted_words:
        c.append('\t'.join(map(str,word_tuple)))
    d = '\t'.join(c)
    common_words.append((city[0], d))
#print(common_words)

In [None]:
result_task7 = sc.parallelize(common_words)\
                    .map(lambda x: '{}\t{}'.format(x[0], x[1]))

In [None]:
resultsPath = 'results/result_7.tsv'
if os.path.isdir(resultsPath):
    shutil.rmtree(resultsPath)
result_task7.coalesce(1).saveAsTextFile(resultsPath)

# Task 8

In [None]:
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("phase1_dataframe") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
parts = rdd.map(lambda l: l.split('\t'))
tweets = parts.map(lambda x: Row(\
                                utc_time=x[0],\
                                country_name=x[1],\
                                country_code=x[2],\
                                place_type=x[3],\
                                place_name=x[4],\
                                language=x[5],\
                                username=x[6],\
                                user_screen_name=x[7],\
                                timezone_offset=x[8],\
                                number_of_friends=x[9],\
                                tweet_text=x[10],\
                                latitude=x[11],\
                                longitude=x[12]\
                                ))
df = spark.createDataFrame(tweets)
df.createOrReplaceTempView("tweets")

In [None]:
sql = """
    select count(*) as number_of_tweets,
        count(distinct(username)) as distinct_users,
        count(distinct(country_name)) as distinct_countries,
        count(distinct(place_name)) as distinct_places,
        count(distinct(country_name)) as distinct_languages,
        min(latitude) as minimum_latitude,
        min(longitude) as minimum_longitude,
        max(latitude) as maximum_latitude,
        max(longitude) as maximum_longitude    
    from tweets
"""


df_sql = spark.sql(sql)
df_sql.show()