In [1]:
# imports
import os

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.conf import SparkConf
import pyspark.sql.functions as func

import pandas as pd

In [2]:
BUCKET_S3 = os.environ.get('BUCKET_S3','')
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID','')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY','')

In [3]:
path_categories = "{}/data/categories.csv".format(BUCKET_S3.replace('s3','s3a'))
path_cities = "{}/data/cities.csv".format(BUCKET_S3.replace('s3','s3a'))
path_groups = "{}/data/groups.csv".format(BUCKET_S3.replace('s3','s3a'))

In [4]:
# create an spark session
spark_conf = SparkConf().setAppName('my_etl')
spark_conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.0')
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

sc = spark.sparkContext

In [5]:
sc._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") 
sc._jsc.hadoopConfiguration().set("fs.s3n.multiobjectdelete.enable","false")

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY_ID)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", AWS_SECRET_ACCESS_KEY)

In [6]:
df_categories = spark.read.format("csv")\
                            .option("header", "true")\
                            .option("inferSchema", "true")\
                            .load(path_categories)

In [7]:
df_categories.printSchema()

root
 |-- category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- shortname: string (nullable = true)
 |-- sort_name: string (nullable = true)



In [8]:
df_cities = spark.read.format("csv")\
                            .option("header", "true")\
                            .option("inferSchema", "true")\
                            .load(path_cities)

In [9]:
df_cities.printSchema()

root
 |-- city: string (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- distance: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- localized_country_name: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- member_count: integer (nullable = true)
 |-- ranking: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)



In [10]:
df_groups = spark.read.format("csv")\
                            .option("header", "true")\
                            .option("inferSchema", "true")\
                            .load(path_groups)

In [11]:
df_groups.printSchema()

root
 |-- group_id: integer (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- category.name: string (nullable = true)
 |-- category.shortname: string (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- description: string (nullable = true)
 |-- group_photo.base_url: string (nullable = true)
 |-- group_photo.highres_link: string (nullable = true)
 |-- group_photo.photo_id: integer (nullable = true)
 |-- group_photo.photo_link: string (nullable = true)
 |-- group_photo.thumb_link: string (nullable = true)
 |-- group_photo.type: string (nullable = true)
 |-- join_mode: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- link: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- group_name: string (nullable = true)
 |-- organizer.member_id: integer (nullable = true)
 |-- organizer.na

In [12]:
df_groups_city = df_groups.join(df_cities, ["city_id"], how="left_outer")

In [13]:
df_city_groups = df_groups.select('city','group_id')

In [14]:
df_summary_city_groups = df_city_groups.groupBy('city').count()

In [23]:
path_to_write = "{}/output_data/summary_city_groups".format(BUCKET_S3.replace('s3','s3a'))

In [None]:
df_summary_city_groups.write.parquet(path_to_write)

In [15]:
dfp_summary = df_summary_city_groups.toPandas()

In [16]:
dfp_summary.sort_values('count', ascending=False)

Unnamed: 0,city,count
8,New York,8565
1,San Francisco,4555
2,Chicago,3168
6,South San Francisco,19
4,West New York,11
3,Chicago Ridge,5
5,West Chicago,4
7,Chicago Heights,2
0,North Chicago,1


In [None]:
dfp_suma

In [None]:
dfp_summary.to_csv("../output_data/summary_city_groups.csv", index=False)