In [1]:
# https://github.com/sourav-mazumder/Data-Science-Extensions/tree/master/spark-datasource-rest

In [2]:
import pandas as pd
from pyspark.sql import functions as F

import findspark
findspark.init() 

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.session import SparkSession

conf = SparkConf().setAppName("app1").setMaster("local")
sc = SparkContext('local', 'CurrentWeather')
spark = SparkSession(sc)

In [4]:
def get_new_data(cities):
    all_results_df = pd.DataFrame(columns = ['city_name','feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min'])
    for city in cities:
        # get JSON
        uri = 'https://api.openweathermap.org/data/2.5/weather?q=' + city + '&appid=24b234cb23d2bddfc71ee6db0e1b1d6c'
        # create pandas dataframe with query fields
        data = {'query':[city]} 
        query_df = pd.DataFrame(data) 
        # convert pandas dataframe with Spark dataframe
        weather_input_df = spark.createDataFrame(query_df)
        # create a temporary table with query fields
        weather_input_df.createOrReplaceTempView('weather_input_table')
        # create parameter map to pass query fields to openweather REST API
        prmsSoda = { 'url' : uri, 'input' : 'weather_input_table', 'method' : 'GET', 'readTimeout' : '10000', 'connectionTimeout' : '2000', 'partitions' : '10'}
        # create the dataframe which contains the result from the call to the API
        weather_result_df = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**prmsSoda).load()
        weather_result_df.show()
        
        # modify datatypes of Spark df so it can be saved as .csv
        df = weather_result_df.select(
            F.array(F.expr("output.main.feels_like")).alias("feels_like"),
            F.array(F.expr("output.main.humidity")).alias("humidity"),
            F.array(F.expr("output.main.pressure")).alias("pressure"),
            F.array(F.expr("output.main.temp")).alias("temp"),
            F.array(F.expr("output.main.temp_max")).alias("temp_max"),
            F.array(F.expr("output.main.temp_min")).alias("temp_min"),
        )
        # making a new column from nested elements
        new_df = df.withColumn("main", F.explode(F.arrays_zip("feels_like","humidity","pressure","temp","temp_max","temp_min")))\
                .select("main.feels_like","main.humidity","main.pressure","main.temp","main.temp_max","main.temp_min")
        new_df.show()
        
        # compiling all results into one pandas dataframe
        pd_df = new_df.toPandas()
        pd_df['city_name'] = city
        all_results_df = all_results_df.append(pd_df.iloc[0])
    return all_results_df


In [5]:
cities = ['barcelona','seville','madrid','barcelona','valencia']

In [6]:
new_data = get_new_data(cities)
new_data

+--------------------+---------+
|              output|    query|
+--------------------+---------+
|[stations, [20], ...|barcelona|
+--------------------+---------+

+----------+--------+--------+------+--------+--------+
|feels_like|humidity|pressure|  temp|temp_max|temp_min|
+----------+--------+--------+------+--------+--------+
|    277.49|      61|    1005|281.46|  282.59|  279.82|
+----------+--------+--------+------+--------+--------+

+--------------------+-------+
|              output|  query|
+--------------------+-------+
|[stations, [75], ...|seville|
+--------------------+-------+

+----------+--------+--------+------+--------+--------+
|feels_like|humidity|pressure|  temp|temp_max|temp_min|
+----------+--------+--------+------+--------+--------+
|     281.4|      81|    1018|282.88|  283.71|  281.48|
+----------+--------+--------+------+--------+--------+

+--------------------+------+
|              output| query|
+--------------------+------+
|[stations, [75], ...|madr

Unnamed: 0,city_name,feels_like,humidity,pressure,temp,temp_max,temp_min
0,barcelona,277.49,61,1005,281.46,282.59,279.82
0,seville,281.4,81,1018,282.88,283.71,281.48
0,madrid,270.88,86,1011,278.15,278.15,278.15
0,barcelona,277.49,61,1005,281.46,282.59,279.82
0,valencia,275.9,61,1010,282.57,283.15,282.04


In [7]:
new_data.to_csv('new_weather_data_5_cities.csv', index="False")