REST API stands for Representative state transfer API

spark rest APIS consumes less bandwidth, making them faster and lightweight


Importing libraries 

In [1]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

 Define the API endpoint URL

In [3]:
url = "https://soda.demo.socrata.com/resource/6yvf-kk3n.json"

 Send a GET request to the API and retrieve the JSON response
 
 REST api has 4 methods(GET,POST,PUT,DELETE)

In [4]:
response = requests.get(url)
data = response.json()


Create a SparkSession

In [7]:
spark = SparkSession.builder.appName("RESTAPIS")\
                    .config('spark.jars.packages', 'org.postgresql:postgresql:42.5.4')\
                    .getOrCreate()
sqlContext = SparkSession(spark)
spark.sparkContext.setLogLevel("ERROR")

Convert the JSON response to a Spark DataFrame

create DataFrame 

In [8]:
df = spark.createDataFrame(data)

Select the desired fields from the DataFrame and rename them if necessary

In [9]:
RSTAPIS = df.select(col("source"), col("earthquake_id"), \
                    col("magnitude"), col("depth"), \
                    col("region"))

In [10]:
RSTAPIS.show()

[Stage 0:>                                                          (0 + 1) / 1]

+------+-------------+---------+-------+--------------------+
|source|earthquake_id|magnitude|  depth|              region|
+------+-------------+---------+-------+--------------------+
|    hv|   hv60679056|      1.8|    2.7|3km W of Volcano,...|
|    ak|   ak11250315|      0.7|   17.4|110km W of Cantwe...|
|    ak|   ak11249227|      2.1|   11.3|4km NE of Badger,...|
|    ak|   ak11242713|      0.8|      0|78km W of Healy, ...|
|    uw|   uw60757962|      2.3|      0|10km S of Princet...|
|    us|   usb000q0yd|      5.1|     10|Pacific-Antarctic...|
|    ak|   ak11249866|      0.7|      8|104km W of Cantwe...|
|    us|   usb000qhnh|      5.5|     10|Southeast Indian ...|
|    ak|   ak11259904|      0.9|   14.1|54km WNW of Valde...|
|    us|   usb000q4uy|      4.8|     10|49km NNE of Andro...|
|    nn|   nn00446263|     0.19|10.5214|12km S of Reno, N...|
|    hv|   hv60688941|        2|    2.1|5km SW of Volcano...|
|    uw|   uw60052508|      1.7|      0|21km SW of Gold B...|
|    nn|

                                                                                

In [11]:
RSTAPIS.printSchema()

root
 |-- source: string (nullable = true)
 |-- earthquake_id: string (nullable = true)
 |-- magnitude: string (nullable = true)
 |-- depth: string (nullable = true)
 |-- region: string (nullable = true)



Creating Temp Table or HIVE table

In [11]:
RSTAPIS.createOrReplaceTempView("sodaAPI")

In [12]:
mydf=sqlContext.sql("SELECT DISTINCT(*) FROM sodaAPI")

In [13]:
mydf.show()

[Stage 1:>                                                          (0 + 4) / 4]

+------+-------------+---------+-------+--------------------+
|source|earthquake_id|magnitude|  depth|              region|
+------+-------------+---------+-------+--------------------+
|    ci|   ci37218480|     1.58|   4.92|27km ENE of Pine ...|
|    nc|   nc72206675|      0.5|    1.3|2km WNW of Cobb, ...|
|    ak|   ak11265217|      1.2|    0.8|74km NNW of Talke...|
|    ak|   ak11254160|        3|   13.8|98km WSW of Healy...|
|    ci|   ci37218832|     0.23|   0.14|40km N of Inyoker...|
|    ak|   ak11245826|      1.2|    1.3|103km W of Cantwe...|
|    us|   usb000q35s|      4.8|     35|72km SSE of Kirak...|
|    ci|   ci37216376|     0.38|  14.65|6km NW of Anza, C...|
|    us|   usb000pw9i|      4.6|  64.12|76km WSW of Pangu...|
|    nc|   nc72215151|      1.1|    4.7|11km SW of San Si...|
|    nc|   nc72205675|      0.3|    6.6|1km ESE of Mammot...|
|    ak|   ak11242458|        1|    6.4|113km NW of Talke...|
|    ak|   ak11254614|      2.4|   38.6|74km NNE of Sutto...|
|    ak|

                                                                                

In [14]:
mydf.head()

Row(source='ci', earthquake_id='ci37218480', magnitude='1.58', depth='4.92', region='27km ENE of Pine Valley, California')

In [15]:
sourceinfo=mydf.select('magnitude','earthquake_id','region','source').groupBy('source').count().orderBy('count')

In [16]:
sourceinfo.show()

+------+-----+
|source|count|
+------+-----+
|    nm|    4|
|    mb|    5|
|    uu|   27|
|    uw|   33|
|    nn|   41|
|    hv|   41|
|    pr|   42|
|    us|   90|
|    ci|  147|
|    nc|  241|
|    ak|  329|
+------+-----+



store the data in the postgresql in the table named sourceinfo

In [18]:
sourceinfo.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://192.168.0.104:5432/postgres") \
  .option("driver", "org.postgresql.Driver") \
  .option("dbtable", "public.sourceinfo") \
  .option("user", "postgres") \
  .option("password", "erick")\
  .save()

In [None]:
check in the pgAdmin to confirm that the table has been successfully created