In [29]:
import yfinance as yf
import psycopg2
from psycopg2.extras import execute_values
import pandas as pd



from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType,
    FloatType, DateType, IntegerType
)
import os

# Driver ve Worker için Python sürümünü belirtin
os.environ["PYSPARK_PYTHON"] = "/opt/homebrew/Caskroom/miniforge/base/envs/pyspark_env/bin/python3.10"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/opt/homebrew/Caskroom/miniforge/base/envs/pyspark_env/bin/python3.10"

# 1.1 Şema Tanımları

stock_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("ticker", StringType(), True),
    StructField("date", DateType(), True),
    StructField("open", FloatType(), True),
    StructField("high", FloatType(), True),
    StructField("low", FloatType(), True),
    StructField("close", FloatType(), True),
    StructField("adj_close", FloatType(), True),
    StructField("volume", IntegerType(), True),
])

index_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("index_name", StringType(), True),
    StructField("date", DateType(), True),
    StructField("open", FloatType(), True),
    StructField("high", FloatType(), True),
    StructField("low", FloatType(), True),
    StructField("close", FloatType(), True),
    StructField("adj_close", FloatType(), True),
    StructField("volume", IntegerType(), True),
])

macro_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("symbol", StringType(), True),
    StructField("date", DateType(), True),
    StructField("open", FloatType(), True),
    StructField("high", FloatType(), True),
    StructField("low", FloatType(), True),
    StructField("close", FloatType(), True),
    StructField("volume", IntegerType(), True),
])


In [30]:
def extract_data(tickers):
    """
    Fetch historical data for stocks, indices, and macroeconomic indicators.
    
    :param tickers: List of tickers (stocks, indices, or macro symbols).
    :return: Dictionary of pandas DataFrames with ticker as key.
    """
    data_dict = {}
    for ticker in tickers:
        try:
            print(f"Fetching data for {ticker}...")
            data = yf.download(ticker, start="2014-01-01", end="2024-01-01")
            df = data.reset_index()  # 'Date' kolonunu sıralı indeks olmaktan çıkar
            data_dict[ticker] = df
        except Exception as e:
            print(f"Error fetching data for {ticker}: {e}")
    return data_dict


In [31]:
def transform_data(raw_data):
    """
    Convert raw pandas DataFrames into Spark DataFrames with appropriate schemas.
    
    :param raw_data: Dictionary {ticker: pandas.DataFrame}
    :return: Dictionary {ticker: spark.DataFrame}
    """
    spark = SparkSession.builder.appName("Data Transformation").getOrCreate()
    
    # YFinance -> PySpark kolon eşleştirmesi
    rename_map = {
        "Date": "date",
        "Open": "open",
        "High": "high",
        "Low": "low",
        "Close": "close",
        "Adj Close": "adj_close",
        "Volume": "volume"
    }
    
    transformed_data = {}
    
    for ticker, df in raw_data.items():
        # 1) MultiIndex'i düzleştirme (eğer tek ticker'da MultiIndex oluştuysa)
        if isinstance(df.columns, pd.MultiIndex):
            df.columns = [
                "_".join(tuple(str(col_part) for col_part in col if col_part))
                for col in df.columns.to_flat_index()
            ]
        
        # 2) Kolonları rename et, yoksa None oluştur
        for old_col, new_col in rename_map.items():
            if old_col in df.columns:
                df.rename(columns={old_col: new_col}, inplace=True)
            else:
                df[new_col] = None  # Kolon yoksa None ile oluştur
        
        # 3) 'id' kolonunu ekle
        if "id" not in df.columns:
            df.insert(0, "id", range(1, len(df) + 1))
        
        # 4) Ticker türüne göre hangi şema?
        if ticker in ["AAPL", "NVDA", "MSFT", "AVGO", "META", "AMZN", "TSLA"]:
            # Stok (hisse senedi)
            schema = stock_schema
            if "ticker" not in df.columns:
                df["ticker"] = ticker
            # PySpark DataFrame'e geçirirken kolon sırasına dikkat
            df = df[["id", "ticker", "date", "open", "high", "low", "close", "adj_close", "volume"]]
        
        elif ticker in ["^GSPC", "NQ=F", "RTY=F", "^DJI"]:
            # Endeks
            schema = index_schema
            if "index_name" not in df.columns:
                df["index_name"] = ticker
            df = df[["id", "index_name", "date", "open", "high", "low", "close", "adj_close", "volume"]]
        
        else:
            # Makro
            schema = macro_schema
            if "symbol" not in df.columns:
                df["symbol"] = ticker
            df = df[["id", "symbol", "date", "open", "high", "low", "close", "volume"]]
        
        # 5) Spark DataFrame'e dönüştür
        try:
            spark_df = spark.createDataFrame(df, schema=schema)
            spark_df = spark_df.dropna()  # Null kayıtları düşürmek isterseniz
            transformed_data[ticker] = spark_df
        except Exception as e:
            print(f"Error transforming data for {ticker}: {e}")
    
    return transformed_data


In [32]:
def load_data(rows, table_name, db_config):
    """
    Load the transformed data into PostgreSQL.
    
    :param rows: List of Spark Row objects
    :param table_name: Target table name in PostgreSQL
    :param db_config: Dictionary with connection details
    """
    try:
        conn = psycopg2.connect(**db_config)
        cursor = conn.cursor()
        
        # Tablo isimlerine göre farklı INSERT tanımları
        if table_name == "stock_data":
            query = f"""
                INSERT INTO {table_name} (
                    id, ticker, date, open, high, low, close, adj_close, volume
                ) VALUES %s
            """
            values = [
                (
                    row.id,
                    row.ticker,
                    row.date,
                    row.open,
                    row.high,
                    row.low,
                    row.close,
                    row.adj_close,
                    row.volume
                )
                for row in rows
            ]
        
        elif table_name == "index_data":
            query = f"""
                INSERT INTO {table_name} (
                    id, index_name, date, open, high, low, close, adj_close, volume
                ) VALUES %s
            """
            values = [
                (
                    row.id,
                    row.index_name,
                    row.date,
                    row.open,
                    row.high,
                    row.low,
                    row.close,
                    row.adj_close,
                    row.volume
                )
                for row in rows
            ]
        
        else:  # macro_data
            query = f"""
                INSERT INTO {table_name} (
                    id, symbol, date, open, high, low, close, volume
                ) VALUES %s
            """
            values = [
                (
                    row.id,
                    row.symbol,
                    row.date,
                    row.open,
                    row.high,
                    row.low,
                    row.close,
                    row.volume
                )
                for row in rows
            ]
        
        # Veri ekleme (bulk insert)
        execute_values(cursor, query, values)
        conn.commit()
        print(f"Data successfully loaded into {table_name}.")
    
    except Exception as e:
        print(f"Error loading data into PostgreSQL: {e}")
    
    finally:
        if conn:
            conn.close()


In [33]:
def main():
    # 1) Spark oturumu
    spark = SparkSession.builder \
        .appName("Yahoo Finance to PostgreSQL") \
        .getOrCreate()
    
    # 2) Ticker listeleri
    stock_tickers = ["AAPL", "NVDA", "MSFT", "AVGO", "META", "AMZN", "TSLA"]
    index_tickers = ["^GSPC", "NQ=F", "RTY=F", "^DJI"]
    macro_tickers = ["GC=F", "CL=F", "^VIX", "DX-Y.NYB", "^IRX", "^TNX"]  # örnek makro
    
    all_tickers = stock_tickers + index_tickers + macro_tickers
    
    # 3) Veri çekme
    raw_data = extract_data(all_tickers)
    
    # 4) Veri dönüştürme
    transformed_data = transform_data(raw_data)
    
    # 5) Veritabanı bilgileri
    db_config = {
        "host": "localhost",
        "port": "5432",
        "user": "postgres",
        "password": "7362",
        "dbname": "finance"
    }
    
    # 6) Yükleme
    for ticker, spark_df in transformed_data.items():
        # Ticker hangi tabloya gidecek?
        if ticker in stock_tickers:
            table_name = "stock_data"
        elif ticker in index_tickers:
            table_name = "index_data"
        else:
            table_name = "macro_data"
        
        # Spark DF -> Row List
        rows = spark_df.collect()
        load_data(rows, table_name, db_config)
    
    # 7) Spark sonlandır
    spark.stop()

if __name__ == "__main__":
    main()


25/01/08 18:29:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


Fetching data for AAPL...
Fetching data for NVDA...
Fetching data for MSFT...
Fetching data for AVGO...
Fetching data for META...
Fetching data for AMZN...
Fetching data for TSLA...
Fetching data for ^GSPC...
Fetching data for NQ=F...
Fetching data for RTY=F...
Fetching data for ^DJI...
Fetching data for GC=F...


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
25/01/08 18:29:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Fetching data for CL=F...
Fetching data for ^VIX...
Fetching data for DX-Y.NYB...
Fetching data for ^IRX...
Fetching data for ^TNX...


25/01/08 18:29:40 ERROR Executor: Exception in task 3.0 in stage 3.0 (TID 27)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1100, in main
    raise PySparkRuntimeError(
    ...<5 lines>...
    )
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 13) than that in driver 3.10, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:52

Py4JJavaError: An error occurred while calling o1285.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 3.0 failed 1 times, most recent failure: Lost task 4.0 in stage 3.0 (TID 28) (161.9.106.98 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1100, in main
    raise PySparkRuntimeError(
    ...<5 lines>...
    )
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 13) than that in driver 3.10, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4149)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4146)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1100, in main
    raise PySparkRuntimeError(
    ...<5 lines>...
    )
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 13) than that in driver 3.10, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


25/01/08 20:44:13 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 958477 ms exceeds timeout 120000 ms
25/01/08 20:44:13 WARN SparkContext: Killing executors is not supported by current scheduler.
25/01/08 20:48:26 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$