In [None]:

"""
NOAA GHCN-Daily Data Exploration Notebook

Comprehensive analysis of the ingested weather data including:
- Data structure and schema analysis
- Volume and distribution metrics
- Data consistency checks
- Privacy risk assessment
- Visualizations and insights
"""



In [8]:
import warnings
warnings.filterwarnings('ignore')

from pathlib import Path
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime
import numpy as np
import os

In [3]:
# Set plotting style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 10

## Setup & Data Loading

In [5]:
# Initialize Spark
spark = SparkSession.builder \
    .appName("GHCN-Data-Exploration") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/14 21:47:23 WARN Utils: Your hostname, Jonathans-MacBook-Pro-2.local, resolves to a loopback address: 127.0.0.1; using 192.168.100.146 instead (on interface en0)
25/12/14 21:47:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/14 21:47:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
# Define paths
BASE_DIR = Path("../ghcn_data/processed")
WEATHER_PATH = BASE_DIR / "weather_observations"
STATIONS_PATH = BASE_DIR / "stations_metadata"
SUMMARY_PATH = BASE_DIR / "summary_statistics"

In [11]:
print("Loading datasets...")
print("-"*80)

# Load data
weather_df = spark.read.parquet(str(WEATHER_PATH))
stations_df = spark.read.parquet(str(STATIONS_PATH))
summary_df = spark.read.parquet(str(SUMMARY_PATH))

print(f"Weather observations loaded: {weather_df.count():,} records")
print(f"Station metadata loaded: {stations_df.count():,} stations")
print(f"Summary statistics loaded: {summary_df.count():,} entries\n")

Loading datasets...
--------------------------------------------------------------------------------


                                                                                

Weather observations loaded: 74,785,736 records
Station metadata loaded: 129,658 stations
Summary statistics loaded: 1,704 entries



## Data Structure Analysis

In [12]:
print("\n" + "="*80)
print("SECTION 1: DATA STRUCTURE ANALYSIS")
print("="*80)

print("\n1.1 WEATHER OBSERVATIONS SCHEMA")
print("-"*80)
weather_df.printSchema()

print("\n1.2 STATIONS METADATA SCHEMA")
print("-"*80)
stations_df.printSchema()

print("\n1.3 SAMPLE WEATHER RECORDS")
print("-"*80)
weather_sample_pd = weather_df.limit(5).toPandas()
print(weather_sample_pd.to_string())

print("\n1.4 SAMPLE STATION RECORDS")
print("-"*80)
stations_sample_pd = stations_df.limit(5).toPandas()
print(stations_sample_pd.to_string())

print("\n1.5 DATA TYPES SUMMARY")
print("-"*80)
print("\nWeather Observations:")
for field in weather_df.schema.fields:
    print(f"  {field.name:25s} : {field.dataType}")

print("\nStations Metadata:")
for field in stations_df.schema.fields:
    print(f"  {field.name:25s} : {field.dataType}")


SECTION 1: DATA STRUCTURE ANALYSIS

1.1 WEATHER OBSERVATIONS SCHEMA
--------------------------------------------------------------------------------
root
 |-- station_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- element: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- mflag: string (nullable = true)
 |-- qflag: string (nullable = true)
 |-- sflag: string (nullable = true)
 |-- obs_time: string (nullable = true)
 |-- value_converted: double (nullable = true)
 |-- quality_passed: boolean (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- ingestion_date: date (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- elevation: string (nullable = true)
 |-- state: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gsn_flag: string (nullable = true)
 |-- hcn_flag: string (nullable = true)
 |-- wmo_id: string (nullable = true)
 |-- year: integer (nullable = true)

### Data Volume

In [13]:
print("\n" + "="*80)
print("SECTION 2: VOLUME ANALYSIS")
print("="*80)

print("\n2.1 OVERALL VOLUME METRICS")
print("-"*80)

# Total records
total_records = weather_df.count()
total_stations = weather_df.select("station_id").distinct().count()
date_range = weather_df.agg(
    F.min("date").alias("min_date"),
    F.max("date").alias("max_date")
).collect()[0]

print(f"Total weather observations: {total_records:,}")
print(f"Unique stations: {total_stations:,}")
print(f"Date range: {date_range['min_date']} to {date_range['max_date']}")
print(f"Time span: {(date_range['max_date'] - date_range['min_date']).days} days")


SECTION 2: VOLUME ANALYSIS

2.1 OVERALL VOLUME METRICS
--------------------------------------------------------------------------------




Total weather observations: 74,785,736
Unique stations: 46,177
Date range: 2023-01-01 to 2024-12-31
Time span: 730 days


                                                                                

In [15]:
# Element type distribution
print("\n2.2 WEATHER ELEMENT DISTRIBUTION")
print("-"*80)
element_counts = weather_df.groupBy("element").count() \
    .orderBy(F.desc("count")) \
    .toPandas()
print(element_counts.to_string(index=False))


2.2 WEATHER ELEMENT DISTRIBUTION
--------------------------------------------------------------------------------


                                                                                

element    count
   PRCP 22433319
   SNOW 11231483
   TMAX  8456899
   TMIN  8454108
   SNWD  6346826
   TAVG  4329796
   TOBS  3171741
   WESD  1077245
   AWND   835024
   WSF2   787762
   WDF2   787536
   WSF5   762236
   WDF5   762021
   WESF   655095
   RHMX   334029
   RHMN   334029
   RHAV   333856
   ADPT   332554
   AWBT   332554
   ASLP   331833
   ASTP   331833
   WT01   326269
   DAPR   259449
   MDPR   257284
   PGTM   200779
   WSFG   198723
   WDFG   186609
   WT03   129567
   SX32    92834
   SN32    92802
   WT08    87837
   EVAP    81493
   WDMV    64706
   MXPN    43865
   WT02    43676
   MNPN    43528
   SX52    34918
   SN52    34161
   WSFI    24976
   AWDR    23247
   DWPR    11839
   SN31    10890
   SX31    10866
   SX33     8553
   SN33     8547
   WT11     6912
   WT06     6644
   DATX     5813
   MDTX     5813
   DATN     5729
   MDTN     5729
   SX53     5097
   SN53     5075
   WT04     4799
   WT05     4710
   SX35     4120
   SN35     3382
   THIC     28

In [16]:
# Temporal distribution
print("\n2.3 TEMPORAL DISTRIBUTION")
print("-"*80)
temporal_dist = weather_df.groupBy("year", "month").count() \
    .orderBy("year", "month") \
    .toPandas()
print(f"Total year-month combinations: {len(temporal_dist)}")
print(f"Average records per month: {temporal_dist['count'].mean():,.0f}")
print(f"Median records per month: {temporal_dist['count'].median():,.0f}")


2.3 TEMPORAL DISTRIBUTION
--------------------------------------------------------------------------------




Total year-month combinations: 24
Average records per month: 3,116,072
Median records per month: 3,111,894


                                                                                

In [18]:
# Geographic distribution
print("\n2.4 GEOGRAPHIC DISTRIBUTION")
print("-"*80)
geo_dist = weather_df.groupBy("state").agg(
    F.count("*").alias("observation_count"),
    F.countDistinct("station_id").alias("station_count")
).orderBy(F.desc("observation_count"))

print("\nTop 10 States by Observation Count:")
geo_dist_pd = geo_dist.limit(10).toPandas()
print(geo_dist_pd.to_string(index=False))


2.4 GEOGRAPHIC DISTRIBUTION
--------------------------------------------------------------------------------

Top 10 States by Observation Count:




state  observation_count  station_count
                14162873           9762
   TX            4235173           3071
   CO            2998321           1835
   CA            2821501           1488
   MN            1907567           1842
   OR            1894115            951
   MT            1697818            589
   KS            1674588           1354
   NC            1667712           1292
   WI            1583757           1019


                                                                                

## Data Consistency

In [19]:
print("\n" + "="*80)
print("SECTION 3: DATA CONSISTENCY ANALYSIS")
print("="*80)

print("\n3.1 MISSING VALUES ANALYSIS")
print("-"*80)

# Calculate null percentages for each column
null_analysis = []
for col in weather_df.columns:
    null_count = weather_df.filter(F.col(col).isNull()).count()
    null_pct = (null_count / total_records) * 100
    null_analysis.append({
        'column': col,
        'null_count': null_count,
        'null_percentage': null_pct
    })

null_df = pd.DataFrame(null_analysis).sort_values('null_percentage', ascending=False)
print(null_df[null_df['null_percentage'] > 0].to_string(index=False))


SECTION 3: DATA CONSISTENCY ANALYSIS

3.1 MISSING VALUES ANALYSIS
--------------------------------------------------------------------------------


                                                                                

  column  null_count  null_percentage
   qflag    74703163        99.889587
   mflag    68931040        92.171373
obs_time    37263857        49.827493


In [21]:
print("\n3.2 DATA QUALITY FLAGS ANALYSIS")
print("-"*80)
quality_summary = weather_df.groupBy("quality_passed").count().toPandas()
quality_summary['percentage'] = (quality_summary['count'] / total_records) * 100
print(quality_summary.to_string(index=False))

failed_quality = weather_df.filter(~F.col("quality_passed")) \
    .groupBy("element", "qflag").count() \
    .orderBy(F.desc("count")) \
    .limit(10) \
    .toPandas()

if len(failed_quality) > 0:
    print("\nTop Quality Flag Issues:")
    print(failed_quality.to_string(index=False))


3.2 DATA QUALITY FLAGS ANALYSIS
--------------------------------------------------------------------------------
 quality_passed    count  percentage
           True 74703163   99.889587
          False    82573    0.110413

Top Quality Flag Issues:
element qflag  count
   PRCP     L  11087
   DAPR     L   8988
   TMIN     I   7983
   MDPR     L   6821
   TMAX     I   5684
   TOBS     I   4402
   PRCP     Z   4240
   SNWD     I   2996
   TMAX     K   1867
   MNPN     D   1788


                                                                                

In [22]:
print("\n3.3 DUPLICATE ANALYSIS")
print("-"*80)

# Check for duplicates
duplicate_check = weather_df.groupBy("station_id", "date", "element") \
    .count() \
    .filter(F.col("count") > 1)

duplicate_count = duplicate_check.count()
print(f"Duplicate records (same station, date, element): {duplicate_count:,}")

if duplicate_count > 0:
    print("\nSample duplicates:")
    print(duplicate_check.limit(5).toPandas().to_string(index=False))


3.3 DUPLICATE ANALYSIS
--------------------------------------------------------------------------------




Duplicate records (same station, date, element): 0

3.4 VALUE RANGE CONSISTENCY
--------------------------------------------------------------------------------


                                                                                

In [23]:
print("\n3.4 VALUE RANGE CONSISTENCY")
print("-"*80)

# Temperature consistency
temp_df = weather_df.filter(F.col("element").isin(["TMAX", "TMIN", "TAVG"]))
temp_stats = temp_df.groupBy("element").agg(
    F.min("value_converted").alias("min_temp_c"),
    F.max("value_converted").alias("max_temp_c"),
    F.avg("value_converted").alias("avg_temp_c"),
    F.stddev("value_converted").alias("stddev_temp_c")
).toPandas()

print("Temperature Statistics (°C):")
print(temp_stats.to_string(index=False))


3.4 VALUE RANGE CONSISTENCY
--------------------------------------------------------------------------------




Temperature Statistics (°C):
element  min_temp_c  max_temp_c  avg_temp_c  stddev_temp_c
   TMIN       -82.0       482.2    5.984023      11.356715
   TMAX      -999.0       807.8   17.384749      12.496617
   TAVG       -99.9       482.2   11.166500      13.398363


                                                                                

In [24]:
# Check for outliers
outlier_threshold_high = 60
outlier_threshold_low = -90

temp_outliers = temp_df.filter(
    (F.col("value_converted") > outlier_threshold_high) |
    (F.col("value_converted") < outlier_threshold_low)
).count()

print(f"\nTemperature outliers (beyond ±90°C to 60°C): {temp_outliers:,}")




Temperature outliers (beyond ±90°C to 60°C): 57


                                                                                

In [25]:
# Precipitation consistency
precip_df = weather_df.filter(F.col("element") == "PRCP")
precip_stats = precip_df.agg(
    F.min("value_converted").alias("min_mm"),
    F.max("value_converted").alias("max_mm"),
    F.avg("value_converted").alias("avg_mm")
).collect()[0]

print(f"\nPrecipitation Statistics (mm):")
print(f"  Min: {precip_stats['min_mm']:.2f}")
print(f"  Max: {precip_stats['max_mm']:.2f}")
print(f"  Avg: {precip_stats['avg_mm']:.2f}")





Precipitation Statistics (mm):
  Min: 0.00
  Max: 2565.40
  Avg: 2.76


                                                                                

In [28]:
print("\n3.5 TEMPORAL CONSISTENCY")
print("-"*80)

# Check for gaps in time series
station_date_coverage = weather_df.groupBy("station_id").agg(
    F.min("date").alias("first_date"),
    F.max("date").alias("last_date"),
    F.countDistinct("date").alias("unique_dates")
).withColumn(
    "expected_dates",
    F.datediff(F.col("last_date"), F.col("first_date")) + 1
).withColumn(
    "coverage_pct",
    (F.col("unique_dates") / F.col("expected_dates")) * 100
)

coverage_stats = station_date_coverage.agg(
    F.avg("coverage_pct").alias("avg_coverage"),
    F.min("coverage_pct").alias("min_coverage"),
    F.max("coverage_pct").alias("max_coverage")
).collect()[0]

print(f"Station Temporal Coverage:")
print(f"  Average: {coverage_stats['avg_coverage']:.2f}%")
print(f"  Min: {coverage_stats['min_coverage']:.2f}%")
print(f"  Max: {coverage_stats['max_coverage']:.2f}%")


3.5 TEMPORAL CONSISTENCY
--------------------------------------------------------------------------------




Station Temporal Coverage:
  Average: 82.76%
  Min: 0.31%
  Max: 100.00%


                                                                                

### Privacy Risk

In [29]:
print("\n" + "="*80)
print("SECTION 4: PRIVACY RISK ASSESSMENT")
print("="*80)

print("\n4.1 DATA CLASSIFICATION")
print("-"*80)
print("""
Weather observation data classification:
  
 PUBLIC DATA - NO PRIVACY CONCERNS
  - Weather measurements are aggregate environmental data
  - Station locations are public infrastructure
  - No personally identifiable information (PII)
  - No sensitive personal data
  
Data contains:
  - Station IDs (public infrastructure codes)
  - Geographic coordinates (public locations)
  - Temperature, precipitation, weather measurements
  - Station names and metadata
""")

print("4.2 IDENTIFIABLE INFORMATION CHECK")
print("-"*80)

# Check for any potentially sensitive fields
sensitive_fields = []
for field in weather_df.columns:
    field_lower = field.lower()
    if any(term in field_lower for term in ['name', 'email', 'phone', 'address', 
                                              'ssn', 'id', 'person', 'user']):
        sensitive_fields.append(field)

print(f"Fields with potentially sensitive names: {sensitive_fields}")
print("\nAnalysis of flagged fields:")

for field in sensitive_fields:
    if field in ['station_id']:
        print(f"  - {field}: Public infrastructure identifier (NO RISK)")


SECTION 4: PRIVACY RISK ASSESSMENT

4.1 DATA CLASSIFICATION
--------------------------------------------------------------------------------

Weather observation data classification:
  
 PUBLIC DATA - NO PRIVACY CONCERNS
  - Weather measurements are aggregate environmental data
  - Station locations are public infrastructure
  - No personally identifiable information (PII)
  - No sensitive personal data
  
Data contains:
  - Station IDs (public infrastructure codes)
  - Geographic coordinates (public locations)
  - Temperature, precipitation, weather measurements
  - Station names and metadata

4.2 IDENTIFIABLE INFORMATION CHECK
--------------------------------------------------------------------------------
Fields with potentially sensitive names: ['station_id', 'name', 'wmo_id']

Analysis of flagged fields:
  - station_id: Public infrastructure identifier (NO RISK)


In [30]:
print("\n4.3 RE-IDENTIFICATION RISK")
print("-"*80)
print("""
Re-identification risk: NONE

Reasons:
  1. Data is aggregated at station level (not individual level)
  2. Weather stations are public infrastructure
  3. Measurements are environmental, not behavioral
  4. No link to individuals or households
  5. Station locations are intentionally public
  
Recommendation: No anonymization required. Data is safe for public use.
""")


4.3 RE-IDENTIFICATION RISK
--------------------------------------------------------------------------------

Re-identification risk: NONE

Reasons:
  1. Data is aggregated at station level (not individual level)
  2. Weather stations are public infrastructure
  3. Measurements are environmental, not behavioral
  4. No link to individuals or households
  5. Station locations are intentionally public
  
Recommendation: No anonymization required. Data is safe for public use.



In [31]:
print("4.4 COMPLIANCE ASSESSMENT")
print("-"*80)
print("""
Regulatory Compliance:
   GDPR: Not applicable (no personal data)
   CCPA: Not applicable (no consumer data)
   HIPAA: Not applicable (no health data)
   Data is publicly available from NOAA
  
Conclusion: Dataset poses NO privacy risks
""")

4.4 COMPLIANCE ASSESSMENT
--------------------------------------------------------------------------------

Regulatory Compliance:
   GDPR: Not applicable (no personal data)
   CCPA: Not applicable (no consumer data)
   HIPAA: Not applicable (no health data)
   Data is publicly available from NOAA
  
Conclusion: Dataset poses NO privacy risks



### Data Visualizations

In [32]:
print("\n" + "="*80)
print("SECTION 5: DATA VISUALIZATIONS")
print("="*80)

# Create output directory for plots
output_dir = Path("./ghcn_analysis_plots")
output_dir.mkdir(exist_ok=True)

print(f"\nGenerating visualizations (saved to {output_dir})...")
print("-"*80)


SECTION 5: DATA VISUALIZATIONS

Generating visualizations (saved to ghcn_analysis_plots)...
--------------------------------------------------------------------------------


In [34]:
# 6.1 Element Distribution
print("\n5.1 Generating element distribution plot...")
element_data = weather_df.groupBy("element").count().orderBy(F.desc("count")).toPandas()

fig, ax = plt.subplots(figsize=(12, 6))
sns.barplot(data=element_data, x='element', y='count', ax=ax, palette='viridis')
ax.set_title('Distribution of Weather Elements', fontsize=14, fontweight='bold')
ax.set_xlabel('Weather Element', fontsize=12)
ax.set_ylabel('Number of Observations', fontsize=12)
ax.tick_params(axis='x', rotation=45)
for i, v in enumerate(element_data['count']):
    ax.text(i, v, f'{v:,.0f}', ha='center', va='bottom', fontsize=7)
plt.tight_layout()
plt.savefig(output_dir / '01_element_distribution.png', dpi=300, bbox_inches='tight')
plt.close()


5.1 Generating element distribution plot...


                                                                                

In [35]:
# 6.2 Temporal Distribution
print("5.2 Generating temporal distribution plot...")
temporal_data = weather_df.groupBy("year", "month").count().orderBy("year", "month").toPandas()
temporal_data['date'] = pd.to_datetime(temporal_data[['year', 'month']].assign(day=1))

fig, ax = plt.subplots(figsize=(14, 6))
ax.plot(temporal_data['date'], temporal_data['count'], marker='o', linewidth=2, markersize=8)
ax.set_title('Observation Volume Over Time', fontsize=14, fontweight='bold')
ax.set_xlabel('Date', fontsize=12)
ax.set_ylabel('Number of Observations', fontsize=12)
ax.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(output_dir / '02_temporal_distribution.png', dpi=300, bbox_inches='tight')
plt.close()

5.2 Generating temporal distribution plot...


                                                                                

In [36]:
# 6.3 Geographic Distribution (Top 20 states)
print("5.3 Generating geographic distribution plot...")
geo_data = weather_df.groupBy("state").count() \
    .orderBy(F.desc("count")) \
    .limit(20) \
    .toPandas()

fig, ax = plt.subplots(figsize=(14, 8))
sns.barplot(data=geo_data, y='state', x='count', ax=ax, palette='coolwarm')
ax.set_title('Top 20 States by Observation Count', fontsize=14, fontweight='bold')
ax.set_xlabel('Number of Observations', fontsize=12)
ax.set_ylabel('State', fontsize=12)
for i, v in enumerate(geo_data['count']):
    ax.text(v, i, f' {v:,.0f}', va='center', fontsize=9)
plt.tight_layout()
plt.savefig(output_dir / '03_geographic_distribution.png', dpi=300, bbox_inches='tight')
plt.close()

5.3 Generating geographic distribution plot...


                                                                                

In [37]:
# 6.4 Temperature Analysis
print("5.4 Generating temperature analysis plots...")
temp_sample = temp_df.filter(F.col("element") == "TMAX") \
    .sample(fraction=0.01) \
    .toPandas()

fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Temperature distribution
axes[0].hist(temp_sample['value_converted'], bins=50, edgecolor='black', alpha=0.7, color='orangered')
axes[0].set_title('Maximum Temperature Distribution', fontsize=14, fontweight='bold')
axes[0].set_xlabel('Temperature (°C)', fontsize=12)
axes[0].set_ylabel('Frequency', fontsize=12)
axes[0].axvline(temp_sample['value_converted'].mean(), color='red', linestyle='--', 
                linewidth=2, label=f'Mean: {temp_sample["value_converted"].mean():.1f}°C')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Temperature box plot by month
temp_monthly = temp_df.filter(F.col("element") == "TMAX") \
    .groupBy("month").agg(
        F.avg("value_converted").alias("avg_temp"),
        F.min("value_converted").alias("min_temp"),
        F.max("value_converted").alias("max_temp")
    ).orderBy("month").toPandas()

axes[1].plot(temp_monthly['month'], temp_monthly['avg_temp'], marker='o', 
             linewidth=2, markersize=8, label='Average', color='orangered')
axes[1].fill_between(temp_monthly['month'], temp_monthly['min_temp'], 
                      temp_monthly['max_temp'], alpha=0.3, label='Min-Max Range')
axes[1].set_title('Monthly Temperature Patterns', fontsize=14, fontweight='bold')
axes[1].set_xlabel('Month', fontsize=12)
axes[1].set_ylabel('Temperature (°C)', fontsize=12)
axes[1].set_xticks(range(1, 13))
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig(output_dir / '04_temperature_analysis.png', dpi=300, bbox_inches='tight')
plt.close()

5.4 Generating temperature analysis plots...


                                                                                

25/12/15 05:48:33 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 559806 ms exceeds timeout 120000 ms
25/12/15 05:48:33 WARN SparkContext: Killing executors is not supported by current scheduler.
25/12/15 05:48:34 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$