In [47]:
import findspark 
findspark.init()

In [48]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, expr
from pyspark.sql.types import StructType, StructField, StringType, DoubleType ,ArrayType ,LongType ,IntegerType
import mysql.connector
import os 
from configparser import ConfigParser
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, explode
from pyspark.sql import Row
import json
from datetime import datetime

In [49]:
config = ConfigParser()
config.read('./Config.ini')

['./Config.ini']

In [50]:
spark = SparkSession.builder \
    .appName("Data_Pipeline") \
    .config("spark.jars", "mysql-connector-java-5.1.49.jar")\
    .getOrCreate()

In [51]:
APIKEY = config.get("openweathermap", "APIKEY")
URL = config.get("openweathermap", "URL")

city = {
  "name": "Dakar",
  "lat": 14.67,
  "lon": -17.44
}

In [52]:

headers = {
  'content-type': "application/json"
}
body = json.dumps({})
RestApiRequestRow = Row("verb", "url", "headers", "body")
api_url = f"{URL}?lat={city['lat']}&lon={city['lon']}&exclude=hourly,daily&appid={APIKEY}&units=metric"
# api_url = "https://api.openweathermap.org/data/2.5/onecall?lat=14.497401&lon=-14.452362&exclude=hourly,daily&appid=101098fb7b42c64a657c60649632e063"
request_df = spark.createDataFrame([
  RestApiRequestRow("get", api_url, headers, body)
])



In [53]:
schema = StructType([

  
    StructField("current", StructType([
        StructField("dt", LongType(), True),

        StructField("temp", DoubleType(), True),

        StructField("humidity", IntegerType(), True),

        StructField("visibility", IntegerType(), True),
        StructField("wind_speed", DoubleType(), True),

    ]), True)
])

In [54]:
def executeRestApi(verb, url, headers, body):
  headers = {
      'content-type': "application/json"
  }
  res = None
  # Make API request, get response object back, create dataframe from above schema.
  try:
    if verb == "get":
      res = requests.get(url, data=body, headers=headers)
    else:
      res = requests.post(url, data=body, headers=headers)
  except Exception as e:
    return e
  if res != None and res.status_code == 200:
    return json.loads(res.text)
  return None

In [55]:
udf_executeRestApi = udf(executeRestApi, schema)

In [56]:
result_df = request_df \
             .withColumn("data", udf_executeRestApi(col("verb"), col("url"), col("headers"), col("body")))


In [57]:

from pyspark.sql.functions import split
from pyspark.sql.functions import from_json ,to_json
from pyspark.sql.types import MapType,StringType

current_schema=StructType([
StructField("dt", LongType(), True),

        StructField("temp", DoubleType(), True),

        StructField("humidity", IntegerType(), True),

        StructField("wind_speed", DoubleType(), True),
])

result_df_data = request_df.withColumn("data", udf_executeRestApi(col("verb"), col("url"), col("headers"), col("body")))\
.withColumn("data",to_json(col("data")))\
.withColumn("data", from_json("data", schema))\
.select( col('data.*'))\
.withColumn("current",to_json(col("current")))\
.withColumn("current", from_json("current", current_schema))\
.select( col('current.*'))\
.withColumnRenamed('dt', 'timestamp')\
.withColumnRenamed('temp', 'temperature')


In [58]:
row = result_df_data\
  .collect()
row

[Row(timestamp=1686763303, temperature=28.99, humidity=65, wind_speed=3.09)]

In [59]:
timestamp = datetime.utcfromtimestamp(row[0][0]).strftime('%Y-%m-%d %H:%M:%S')
temperature = row[0][1]
humidity = row[0][2]
wind_speed = row[0][3]

print("timestamp", timestamp)
print("temperature", temperature)
print("humidity", humidity)
print("wind_speed", wind_speed)

timestamp 2023-06-14 17:21:43
temperature 28.99
humidity 65
wind_speed 3.09


In [60]:
url = config.get('mysql', 'host')
user = config.get('mysql', 'user')
pwd  = config.get('mysql', 'password')
db =  config.get('mysql', 'database')
pt = config.get('mysql', 'port')

In [61]:
# Etablir une connexion à MySQL
try:
  conn = mysql.connector.connect(user= user, database=db, password=pwd, host=url, port=pt)
  cursor = conn.cursor()
  query = f"""
  INSERT INTO `data` (`city`, `temperature`, `humidity`, `windSpeed`, `timestamp`) 
  VALUES (
    'Dakar', 
    '{temperature}', 
    '{humidity}', 
    '{wind_speed}', 
    '{timestamp}'
  )"""

  cursor.execute(query)
  conn.commit()
  print(cursor.rowcount, "Record inserted successfully into data table")
  conn.close()
except mysql.connector.Error as error:
  print("Failed to insert record into Laptop table {}".format(error))

finally:
  if conn.is_connected():
    conn.close()
    print("MySQL connection is closed")

1 Record inserted successfully into data table


In [62]:


# result_df_data.write \
#   .format("jdbc") \
#   .option("driver","com.mysql.cj.jdbc.Driver") \
#   .option("url", "jdbc:mysql://172.106.0.54:19320/meteodb") \
#   .option("dbtable", "employee") \
#   .option("user", "admin") \
#   .option("password", "ixJQgvsI") \
#   .save()