In [1]:
import pandas as pd
import numpy as np
import requests
import os
from dotenv import load_dotenv
import fmpsdk as fmp
from pyspark.sql import SparkSession

In [2]:
load_dotenv()
fmp_key = os.getenv("fmp_key")

In [5]:
spark2 = SparkSession.builder \
        .appName("crypto-Data") \
        .getOrCreate()

In [6]:
help(fmp.historical_price_full)

Help on function historical_price_full in module fmpsdk.general:

historical_price_full(apikey: str, symbol: Union[str, List], time_series: int = None, series_type: str = None, from_date: str = None, to_date: str = None) -> Optional[List[Dict]]
    Query FMP Historical Price Full API.
    
    This API endpoint is a multifunction tool!
    
    :param apikey: Your API Key
    :param symbol: The Ticker, Index, Commodity, etc. symbol to query for.
    :param time_series: Not sure what this is.  5 is the only value I've seen used.
    :param series_type: Not sure what this is.  "line" is the only option I've seen used.
    :param from_date: 'YYYY-MM-DD' format
    :param to_date: 'YYYY-MM-DD' format
    :return: A list of dictionaries.



In [7]:
btc_df = fmp.historical_price_full(apikey=fmp_key,symbol="BTCUSD",from_date='2004-02-10')

In [8]:
df_pandas = pd.DataFrame(btc_df)

In [9]:
df_pandas.head()

Unnamed: 0,date,open,high,low,close,adjClose,volume,unadjustedVolume,change,changePercent,vwap,label,changeOverTime
0,2024-02-24,50737.0,51717.0,50576.55,51496.02,51468.07031,15118066688,15118066688,759.02,1.5,51180.27,"February 24, 24",0.015
1,2024-02-23,51300.1,51537.0,50227.0,50737.0,50731.94922,21427078270,21427078270,-563.1,-1.1,51018.44,"February 23, 24",-0.011
2,2024-02-22,51866.9,52100.0,50890.2,51258.7,51304.97266,25413900611,25413900611,-608.2,-1.17,51592.97,"February 22, 24",-0.0117
3,2024-02-21,52270.1,52394.91,50439.0,51851.67,51839.17969,28624907020,28624907020,-418.43,-0.80052,51313.06,"February 21, 24",-0.008005
4,2024-02-20,51771.2,53000.0,50584.0,52167.0,52284.875,33353758256,33353758256,395.8,0.76452,51914.82,"February 20, 24",0.007645


In [10]:
len(df_pandas)

3759

In [11]:
help(spark2.createDataFrame)

Help on method createDataFrame in module pyspark.sql.session:

createDataFrame(data: Union[pyspark.rdd.RDD[Any], Iterable[Any], ForwardRef('PandasDataFrameLike'), ForwardRef('ArrayLike')], schema: Union[pyspark.sql.types.AtomicType, pyspark.sql.types.StructType, str, NoneType] = None, samplingRatio: Optional[float] = None, verifySchema: bool = True) -> pyspark.sql.dataframe.DataFrame method of pyspark.sql.session.SparkSession instance
    Creates a :class:`DataFrame` from an :class:`RDD`, a list, a :class:`pandas.DataFrame`
    or a :class:`numpy.ndarray`.
    
    .. versionadded:: 2.0.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    data : :class:`RDD` or iterable
        an RDD of any kind of SQL data representation (:class:`Row`,
        :class:`tuple`, ``int``, ``boolean``, etc.), or :class:`list`,
        :class:`pandas.DataFrame` or :class:`numpy.ndarray`.
    schema : :class:`pyspark.sql.types.DataType`, str or list, op

In [12]:
spark_df = spark2.createDataFrame(df_pandas)

In [13]:
spark_df.show(n=3)

+----------+-------+-------+--------+--------+-----------+-----------+----------------+------+-------------+--------+---------------+--------------+
|      date|   open|   high|     low|   close|   adjClose|     volume|unadjustedVolume|change|changePercent|    vwap|          label|changeOverTime|
+----------+-------+-------+--------+--------+-----------+-----------+----------------+------+-------------+--------+---------------+--------------+
|2024-02-24|50737.0|51717.0|50576.55|51496.02|51468.07031|15118066688|     15118066688|759.02|          1.5|51180.27|February 24, 24|         0.015|
|2024-02-23|51300.1|51537.0| 50227.0| 50737.0|50731.94922|21427078270|     21427078270|-563.1|         -1.1|51018.44|February 23, 24|        -0.011|
|2024-02-22|51866.9|52100.0| 50890.2| 51258.7|51304.97266|25413900611|     25413900611|-608.2|        -1.17|51592.97|February 22, 24|       -0.0117|
+----------+-------+-------+--------+--------+-----------+-----------+----------------+------+------------

In [14]:
spark_df.write \
    .format("jdbc") \
    .option("url","jdbc:sqlserver://ZAHRA\SQLEXPRESS:61254;database=stock_fundamentals;trustServerCertificate=true;encrypt=true") \
    .option("dbtable","btc_data") \
    .option("user","mehassan") \
    .option("password","password") \
    .save()

In [14]:
# spark2.stop()