# Make the code reusable

In [3]:
from pyspark.sql.functions import explode,explode_outer
from pyspark.sql.types import StructType
import requests, json
# Import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode,col,lit
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()
# Make a request to get information about: "Bulgaria", "Germany", "Italy", "Peru"
# Parse the response as JSON

def getCountryData(country):
    response = requests.get(f"https://restcountries.com/v3.1/name/{country}")
    return response.json();


def getCountryName(country):
    response = requests.get(f"https://restcountries.com/v3.1/alpha/{country}")
    # Parse the response as JSON
    data = response.json()
    return data[0]['name']['common'];

def extractCurrencyName(df):
    df = df.select(explode(col("currencies")))
    return df.select('value.name').rdd.flatMap(lambda x: x).collect();

def prepareDataFrame(country_data):
    df = spark.createDataFrame(country_data)
    currency_name = extractCurrencyName(df)
    if "borders" in df.columns:
        df = df.select(col("name.common").alias("name"),"region","capital","borders")
        df = df.withColumn("borders" , explode_outer(col("borders")))
        custom_getCountryName = F.udf(getCountryName, StringType())
        df = df.withColumn('borders', custom_getCountryName(df.borders))
    else:
        df = df.select(col("name.common").alias("name"),"region","capital")
    df = df.withColumn("capital" , explode("capital"))
    df = df.withColumn("currency" , lit(currency_name))
 
    if "borders" in df.columns:
        return df.groupby("name","region","capital","currency").agg(F.collect_set("borders").alias("borders"));
    else:
        return df;

def showCountryData(country_list):
    df = None
    for country in country_list:
        country_data = getCountryData(country)
        country_df = prepareDataFrame(country_data)
        if df is None:
            df = country_df
        else:
            df = df.unionByName(country_df, allowMissingColumns=True)
    return df;


country_list = ["Cuba","Bulgaria", "Germany", "Italy", "Peru"]
showCountryData(country_list)

DataFrame[name: string, region: string, capital: string, currency: array<string>, borders: array<string>]

In [4]:
from functools import reduce
from pyspark.sql import DataFrame

def getAdditionalCountryData(country):
    response = requests.get(f"https://restcountries.com/v3.1/alpha/{country}")
    res_json = response.json()
    additionalInfo = spark.createDataFrame(res_json)
    return additionalInfo.select(col("name.common").alias("name"),"subregion","languages","flag","population","fifa","timezones",col("flags.png").alias("png"));

def getCountryCode(country):
    response = requests.get(f"https://restcountries.com/v3.1/name/{country}")
    # Parse the response as JSON
    data = response.json()
    return data[0]['cioc'];
    
def extractLanguages(df):
    df = df.select(explode(col("languages")))
    return df.select('value').rdd.flatMap(lambda x: x).collect();
    
def extractOnlyNeededValues(additionalInfo):
    language = extractLanguages(additionalInfo)
    additionalInfo = additionalInfo.withColumn("languages" , lit(language))
    additionalInfo = additionalInfo.withColumn("timezones" , explode(col("timezones")))
    return additionalInfo;

def showAdditionalCountryData(country_list):
    dfs = []
    for country in country_list:
        country_name = getCountryCode(country)
        country_data = getAdditionalCountryData(country_name)
        country_df = extractOnlyNeededValues(country_data)
        dfs.append(country_df)
    df_additional = reduce(DataFrame.union,dfs)
    return df_additional;

showAdditionalCountryData(country_list)

DataFrame[name: string, subregion: string, languages: array<string>, flag: string, population: bigint, fifa: string, timezones: string, png: string]

In [5]:
main_data = showCountryData(country_list)
additional_data = showAdditionalCountryData(country_list)
full_data = main_data.join(additional_data,main_data["name"] == additional_data["name"]).drop(additional_data["name"])
full_data.show()

+--------+--------+-------+--------------------+--------------------+----------------+--------------------+----+----------+----+---------+--------------------+
|    name|  region|capital|            currency|             borders|       subregion|           languages|flag|population|fifa|timezones|                 png|
+--------+--------+-------+--------------------+--------------------+----------------+--------------------+----+----------+----+---------+--------------------+
|Bulgaria|  Europe|  Sofia|     [Bulgarian lev]|[North Macedonia,...|Southeast Europe|         [Bulgarian]|🇧🇬|   6927288| BUL|UTC+02:00|https://flagcdn.c...|
|    Cuba|Americas| Havana|[Cuban convertibl...|                null|       Caribbean|           [Spanish]|🇨🇺|  11326616| CUB|UTC-05:00|https://flagcdn.c...|
| Germany|  Europe| Berlin|              [Euro]|[France, Austria,...|  Western Europe|            [German]|🇩🇪|  83240525| GER|UTC+01:00|https://flagcdn.c...|
|   Italy|  Europe|   Rome|              [Euro

In [6]:
#write the whole data to a file
json_rdd = full_data.rdd.map(lambda row: json.dumps(row.asDict()))
json_rdd.saveAsTextFile("output.json")

Py4JJavaError: An error occurred while calling o1691.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/Users/ppe/Documents/Pyspark_repo/output.json already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:299)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1593)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1593)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1579)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1579)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:563)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
