In [6]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, unix_timestamp, col

# Create a SparkSession
spark = SparkSession.builder.appName("CombineDataFrames").getOrCreate()

# Load the dataframes
btc_df = spark.read.csv("BTCUSDT.csv", header=True, inferSchema=True)
doge_df = spark.read.csv("DOGEUSDT.csv", header=True, inferSchema=True)
eth_df = spark.read.csv("ETHUSDT.csv", header=True, inferSchema=True)

# Add a source identifier column
btc_df = btc_df.withColumn("source", lit("BTCUSDT"))
doge_df = doge_df.withColumn("source", lit("DOGEUSDT"))
eth_df = eth_df.withColumn("source", lit("ETHUSDT"))

# Combine the dataframes
combined_df = btc_df.unionByName(doge_df).unionByName(eth_df)

# Convert the 'timestamp' column to Unix timestamps (numerical representation)
combined_df = combined_df.withColumn("timestamp", unix_timestamp("timestamp"))

# Handle null or missing values in 'timestamp' column
# Replace nulls with 0 before converting to numpy array
combined_df = combined_df.withColumn("timestamp", col("timestamp").cast("long")) # Ensure timestamp is of long type
combined_df = combined_df.fillna(0, subset=['timestamp'])  # Replace nulls with 0


# Show the combined dataframe (optional)
combined_df.show()

+----------+-------+-------+-------+-------+--------+--------------------+------------------+----------------+---------------------------+----------------------------+------+-------+
| timestamp|   open|   high|    low|  close|  volume|          close_time|quote_asset_volume|number_of_trades|taker_buy_base_asset_volume|taker_buy_quote_asset_volume|ignore| source|
+----------+-------+-------+-------+-------+--------+--------------------+------------------+----------------+---------------------------+----------------------------+------+-------+
|1502942400|4261.48|4261.48|4261.48|4261.48|1.775183|2017-08-17 04:00:...|     7564.90685084|               3|                   0.075183|                320.39085084|     0|BTCUSDT|
|1502942460|4261.48|4261.48|4261.48|4261.48|     0.0|2017-08-17 04:01:...|               0.0|               0|                        0.0|                         0.0|     0|BTCUSDT|
|1502942520|4280.56|4280.56|4280.56|4280.56|0.261074|2017-08-17 04:02:...|     1117.5

In [7]:
combined_df.coalesce(1).write.csv("combined_data.csv", header=True, mode="overwrite")

In [4]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [5]:
pip install pyspark

