In [28]:
import mongoengine
import os
import configparser
import requests
import json
import numpy as np
import pandas as pd
from opencage.geocoder import OpenCageGeocode
from datetime import datetime

Configparser implementation

In [8]:
config = configparser.ConfigParser()
config.read('config/meetup.cfg')
os.environ['MEETUP_API_KEY'] = config['MEETUP']['API_KEY']
os.environ['OPENCAGE_KEY']=config['OPENCAGE']['KEY']

geocode = OpenCageGeocode(os.environ['OPENCAGE_KEY'])

In [9]:
class Event:
    
    def __init__(self,
                params,
                url_path):
        self.params = params
        self.url_path = url_path
        
    
    def get_event(self, *args, **kwargs):
        
        r = requests.get(self.url_path, params=self.params)
        request_string = r.text
        
        return request_string

In [10]:
default_args = dict(
    country='United Stated',
    key= os.environ['MEETUP_API_KEY'],
    topic='Python'
)

url_meetup_request = "https://api.meetup.com/2/open_events"
events = Event(params=default_args, url_path=url_meetup_request)
response = events.get_event()

In [11]:
json_response = json.loads(response)
data = json_response['results']
len(data)

198

In [31]:
columns = ['id', 'date', 'year', 'month', 'day', 'country', 'city', 'state,', 'address', 'meetup_name', 'meetup_group_name', 'description', 'event_url', 'yes_rsvp_count', 'status']
id, date, year, month, day, country, city, state, address, meetup_name, meetup_group_name, description, event_url, yes_rsvp_count, status = ([] for i in range(15))

for label in data:
    date_event = datetime.fromtimestamp(label['time'] / 1000.0)
    
    id.append(label['id'])
    date.append(date_event)
    year.append(date_event.year)
    month.append(date_event.month)
    day.append(date_event.year)
    
    if label.get('venue'):
        country.append(label['venue'].get('country'))
        city.append(label['venue'].get('city'))
        state.append(label['venue'].get('state'))
        address.append(label['venue'].get('address_1'))
    else:
        location_json = geocode.reverse_geocode(label['group'].get('group_lat'), label['group'].get('group_lon'))
        country.append(location_json[0]['components'].get('country_code'))
        city.append(location_json[0]['components'].get('city'))
        state.append(location_json[0]['components'].get('state'))
        address.append(location_json[0].get('formatted'))
          
    meetup_name.append(label.get('name'))
    meetup_group_name.append(label['group'].get('name'))
    description.append(label.get('description'))
    event_url.append(label['event_url'])
    yes_rsvp_count.append(label.get('yes_rsvp_count'))
    status.append(label.get('status'))

In [32]:
# results_geocode = geocode.reverse_geocode(label['group']['group_lat'], label['group']['group_lon'])
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
    .getOrCreate()

schema = T.StructType([
        T.StructField("id", T.StringType(), True),
        T.StructField("date", T.TimestampType(), True),
        T.StructField("year", T.IntegerType(), True),
        T.StructField("month", T.IntegerType(), True),
        T.StructField("day", T.IntegerType(), True),
        T.StructField("country", T.StringType(), True),
        T.StructField("city", T.StringType(), True),
        T.StructField("state", T.StringType(), True),
        T.StructField("address", T.StringType(), True),
        T.StructField("meetup_name", T.StringType(), True),
        T.StructField("meetup_group_name", T.StringType(), True),
        T.StructField("description", T.StringType(), True),
        T.StructField("event_url", T.StringType(), True),
        T.StructField("yes_rsvp_count", T.IntegerType(), True),
        T.StructField("status", T.StringType(), True)  
])

df_pandas = pd.DataFrame(np.transpose([id, date, year, month, day, country, city, state, address, meetup_name, meetup_group_name, description, event_url, yes_rsvp_count, status]), columns=columns)
df = spark.createDataFrame(df_pandas, schema=schema)
df.limit(5).toPandas()

Unnamed: 0,id,date,year,month,day,country,city,state,address,meetup_name,meetup_group_name,description,event_url,yes_rsvp_count,status
0,gqwjvqyzhbmb,2019-05-09 10:30:00,2019,5,2019,us,Fredericksburg,VA,919 Caroline St,Coffee Caravan and chat every Thursday morning...,Fredericksburg Developers Group (FredDev),<p>This is a regular meetup over coffee in Fre...,https://www.meetup.com/FredDev/events/260852903/,5,upcoming
1,zzcwmqyzhbmb,2019-05-09 12:00:00,2019,5,2019,us,Chattanooga,TN,1100 Market Street,Smarter Kubernetes Access Control: A Simpler A...,Chadev aka Chattanooga Developer Lunch,<p>Configuring Kubernetes authorization polici...,https://www.meetup.com/chadevs/events/260950371/,28,upcoming
2,fvjqfpyzhbmb,2019-05-09 15:00:00,2019,5,2019,US,Reno,,1091 S Virginia St,Book Club Lunch at India Kabab and Curry,/dev/reno: The Reno Developers Meetup,<p>We're meeting at India Kabab and Curry to h...,https://www.meetup.com/dev-reno/events/260712452/,7,upcoming
3,260643310,2019-05-09 12:00:00,2019,5,2019,us,Raleigh,NC,310 South Harrington Street,Lunch & Learn - Architecting Scalable & Secure...,RIoT,"<p>Join Gordon Blackwell, Cloud Solution Archi...",https://www.meetup.com/NC-RIoT-Regional-Intern...,40,upcoming
4,mhnfwqyzhbmb,2019-05-09 13:00:00,2019,5,2019,us,New York,NY,853 Broadway,Thursday Hacker Hours - meet on second floor,Hacker Hours,<p>**Please read our policies about the meetup...,https://www.meetup.com/hackerhours/events/2611...,16,upcoming


UDF Functions

In [58]:
from pyspark.sql.functions import udf

upper_udf = F.udf(lambda x: x.upper())

# Remove tags from description c
@udf
def remove_tags_udf(text):
    import re
    TAG_RE = re.compile(r'<[^>]+>')
    return TAG_RE.sub('', text)

In [54]:
df = df.withColumn('country', upper_udf(df.country))

In [55]:
df = df.withColumn('description', remove_tags_udf(df.description))

In [56]:
df.limit(5).toPandas()

Py4JJavaError: An error occurred while calling o416.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 20, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 342, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 331, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 80, in <lambda>
    return lambda *a: f(*a)
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 72, in <lambda>
    return lambda *a: g(f(*a))
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 72, in <lambda>
    return lambda *a: g(f(*a))
  File "<ipython-input-47-1db6a5439377>", line 2, in <lambda>
NameError: name 'xml' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3255)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3255)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 342, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 331, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 80, in <lambda>
    return lambda *a: f(*a)
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 72, in <lambda>
    return lambda *a: g(f(*a))
  File "/Users/StriderKeni/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 72, in <lambda>
    return lambda *a: g(f(*a))
  File "<ipython-input-47-1db6a5439377>", line 2, in <lambda>
NameError: name 'xml' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
