# Step 2: Data Analysis Using Spark Core

In [171]:
from pyspark import SparkConf, SparkContext

##A. Blockchain Data Analysis – Part 1

In [35]:
conf = SparkConf().setAppName("BlockchainAnalysis")
sc = SparkContext(conf=conf)

blockchain_data = '/project1/blockchain_data.csv'
data = sc.textFile(blockchain_data)

###1. How many total blocks are there in your dataset?

In [36]:
total_blocks = data.count()
print "Total Blocks:", total_blocks

Total Blocks: 1003


###2. What is the largest block height among the blocks in your dataset?

In [49]:
block_heights = data.filter(lambda line: "height" not in line).map(lambda line: int(line.split(",")[1].strip()))
largest_block_height = block_heights.max()
print("Largest Block Height:", largest_block_height)

('Largest Block Height:', 820527)


###3. What is the date and time for that block?

In [50]:
largest_block_info = data.filter(lambda line: "height" not in line).\
    filter(lambda line: int(line.split(",")[1].strip()) == largest_block_height).collect()[0]
date, time = largest_block_info.split(",")[0].strip(), largest_block_info.split(",")[3].strip()
print("Date and Time for the Largest Block:", date, time)

('Date and Time for the Largest Block:', u'2023-12-10', u'1702187812')


###4. What is the highest number of transactions in your blocks?

In [None]:
highest_transactions = 0

for block in data:
    num_transactions = block.get('n_tx', 0)
    if num_transactions > highest_transactions:
        highest_transactions = num_transactions

print("Highest number of transactions in a block:", highest_transactions)

##B. Stock Market Data Analysis

In [26]:
stock_data = '/project1/mydata/stock-data-.*'
data = sc.textFile(stock_data)

In [27]:
parsed_data = data.map(lambda line: line.split(','))

In [31]:
data.collect()

[u'AAPL, 2023-10-23 19:06:00, 172.8950, 172.9200, 172.9800, 172.8100, 10',
 u'MSFT, 2023-10-23 19:06:00, 330.1400, 330.2900, 330.2900, 330.1300, 32',
 u'GOOGL, 2023-10-23 19:06:00, 137.2000, 137.2400, 137.2800, 137.2000, 155',
 u'TSLA, 2023-10-23 19:06:00, 213.3500, 213.2500, 213.3500, 213.2500, 1886',
 u'AMZN, 2023-10-23 19:06:00, 127.0000, 127.0300, 127.0300, 126.9600, 198',
 u'AAPL, 2023-10-23 19:06:00, 172.8950, 172.9200, 172.9800, 172.8100, 10',
 u'MSFT, 2023-10-23 19:06:00, 330.1400, 330.2900, 330.2900, 330.1300, 32',
 u'GOOGL, 2023-10-23 19:06:00, 137.2000, 137.2400, 137.2800, 137.2000, 155',
 u'TSLA, 2023-10-23 19:06:00, 213.3500, 213.2500, 213.3500, 213.2500, 1886',
 u'AMZN, 2023-10-23 19:06:00, 127.0000, 127.0300, 127.0300, 126.9600, 198',
 u'AAPL, 2023-10-23 19:06:00, 172.8950, 172.9200, 172.9800, 172.8100, 10',
 u'MSFT, 2023-10-23 19:06:00, 330.1400, 330.2900, 330.2900, 330.1300, 32',
 u'GOOGL, 2023-10-23 19:06:00, 137.2000, 137.2400, 137.2800, 137.2000, 155',
 u'TSLA, 2023

### 1. How many records are there in the table?

In [33]:
record_count = parsed_data.count()
print "Total records: {}".format(record_count)

Total records: 45


### 2. How many different days are there in the table?

In [37]:
distinct_days_count = parsed_data.map(lambda values: values[1]).distinct().count()
print "Total different days: {}".format(distinct_days_count)

Total different days: 2


### 3. How many records per each day are there in the table?

In [34]:
records_per_day = parsed_data.map(lambda values: (values[1], 1)).reduceByKey(lambda x, y: x + y)
print "Records per day:"
records_per_day.collect()

Records per day:


[(u' 2023-10-24 19:21:00', 25), (u' 2023-10-23 19:06:00', 20)]

### 4. What are the symbols in the table?

In [38]:
symbols = parsed_data.map(lambda values: values[0]).distinct()
print "Symbols:"
print symbols.collect()

Symbols:
[u'MSFT', u'AAPL', u'AMZN', u'TSLA', u'GOOGL']


### 5. What is the highest price for each symbol?

In [39]:
highest_price_per_symbol = parsed_data.map(lambda values: (values[0], float(values[3])))
max_prices = highest_price_per_symbol.reduceByKey(lambda x, y: max(x, y))
print "Highest price per symbol:"
print max_prices.collect()

Highest price per symbol:
[(u'MSFT', 344.0), (u'AAPL', 172.92), (u'AMZN', 127.03), (u'TSLA', 216.03), (u'GOOGL', 137.24)]


### 6. What is the lowest price for each symbol?

In [40]:
lowest_price_per_symbol = parsed_data.map(lambda values: (values[0], float(values[4])))
min_prices = lowest_price_per_symbol.reduceByKey(lambda x, y: min(x, y))
print "Lowest price per symbol:"
print min_prices.collect()

Lowest price per symbol:
[(u'MSFT', 330.29), (u'AAPL', 172.98), (u'AMZN', 126.65), (u'TSLA', 213.35), (u'GOOGL', 130.08)]


### 7. What is the average price for each symbol?

In [42]:
average_price_per_symbol = parsed_data.map(lambda values: (values[0], float(values[2])))
total_prices = average_price_per_symbol.combineByKey(lambda value: (value, 1),
                                                     lambda x, value: (x[0] + value, x[1] + 1),
                                                     lambda x, y: (x[0] + y[0], x[1] + y[1]))
average_prices = total_prices.mapValues(lambda x: x[0] / x[1])
print "Average price per symbol:"
print average_prices.collect()

Average price per symbol:
[(u'MSFT', 337.22), (u'AAPL', 172.964), (u'AMZN', 126.65555555555557), (u'TSLA', 214.83055555555555), (u'GOOGL', 133.23888888888888)]


### 8. What is the range of price for each symbol?

In [43]:
price_range_per_symbol = parsed_data.map(lambda values: (values[0], (float(values[2]), float(values[3]), float(values[4]))))
range_per_symbol = price_range_per_symbol.reduceByKey(lambda x, y: (min(x[0], y[0]), max(x[1], y[1]), min(x[2], y[2])))
print "Price range per symbol:"
print range_per_symbol.collect()

Price range per symbol:
[(u'MSFT', (330.14, 344.0, 330.29)), (u'AAPL', (172.895, 172.92, 172.98)), (u'AMZN', (126.38, 127.03, 126.65)), (u'TSLA', (213.35, 216.03, 213.35)), (u'GOOGL', (130.07, 137.24, 130.08))]


### 9. What is the date on which each symbol experienced the highest price?

In [44]:
date_highest_price_per_symbol = parsed_data.map(lambda values: (values[0], (values[1], float(values[3]))))
max_date_price_per_symbol = date_highest_price_per_symbol.reduceByKey(lambda x, y: max(x, y, key=lambda z: z[1]))
print "Date on which each symbol experienced the highest price:"
print max_date_price_per_symbol.collect()

Date on which each symbol experienced the highest price:
[(u'MSFT', (u' 2023-10-24 19:21:00', 344.0)), (u'AAPL', (u' 2023-10-23 19:06:00', 172.92)), (u'AMZN', (u' 2023-10-23 19:06:00', 127.03)), (u'TSLA', (u' 2023-10-24 19:21:00', 216.03)), (u'GOOGL', (u' 2023-10-23 19:06:00', 137.24))]


In [170]:
sc.stop() 

## C. Blockchain Data Analysis – Part 2

In [172]:
conf = SparkConf().setAppName("Block_data")
sc = SparkContext(conf=conf)

In [173]:
table_1 = '/project1/Part_C/blocks_2023_Sep_10_to_15.csv'
table_2 = '/project1/Part_C/blocks_info_2023_Sep_10_to_15.csv'
table_3 = '/project1/Part_C/tx_info_2023_Sep_10_to_15.csv'

In [174]:
data_1 = sc.textFile(table_1)
data_2 = sc.textFile(table_2)
data_3 = sc.textFile(table_3)

In [175]:
table1_header = data_1.first()
table2_header = data_2.first()
table3_header = data_3.first()
table1_data = data_1.filter(lambda line: line != table1_header)
table2_data = data_2.filter(lambda line: line != table2_header)
table3_data = data_3.filter(lambda line: line != table3_header)

### 1. How many total blocks are there in your blocks table?

In [28]:
total_blocks = table1_data.count()
print "Total blocks: {}".format(total_blocks)

Total blocks: 920


### 2. What is the largest block height among the blocks in your blocks table?

In [195]:
table1_rdd = table1_data.map(lambda line: (line.split(',')[0], int(line.split(',')[3])))
table2_rdd = table2_data.map(lambda line: (line.split(',')[0], int(line.split(',')[9])))

union_rdd = table1_rdd.union(table2_rdd)

largest_block_height = union_rdd.max()
print "Largest block height: {}".format(largest_block_height)

Largest block height: (u'99', 806883)


### 3. What is the date and time for that block?

In [177]:
table1_rdd = table1_data.map(lambda line: (int(line.split(',')[3]), line.split(',')[2]))

filtered_rdd = table1_rdd.filter(lambda x: x[0] == 807290)

result = filtered_rdd.collect()

if result:
    print("Time for Block 807290: {}".format(result[0][1]))
else:
    print("Block 807290 not found")

Time for Block 807290: 12-09-2023 00:00


###4. What is the largest number of transactions in your blocks?

In [31]:
table3_rdd = table3_data.map(lambda line: (int(line.split(',')[15]), 1))

block_index_counts = table3_rdd.reduceByKey(lambda x, y: x + y)

largest_tx = block_index_counts.max(key=lambda x: x[1])

print("Largest Transaction: {}, Count: {}".format(largest_tx[0], largest_tx[1]))


Largest Transaction: 807118, Count: 7252


In [91]:
sc.stop()

#Step 3: Data Analysis Using Spark SQL

In [85]:
from pyspark import SQLContext

## A. Blockchain Data Analysis – Part 1

In [92]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

# Create a Spark context
sc = SparkContext("local", "BlockchainAnalysis")
sqlContext = SQLContext(sc)

# Load the data from HDFS
data = sc.textFile("/project1/blockchain_data.csv")

# Parse the CSV data into RDD
header = data.first()
data = data.filter(lambda line: line != header).map(lambda line: line.split(','))

# Convert height to integer for comparison
data = data.map(lambda x: (x[0], int(x[1]), x[2], int(x[3])))


###1. How many total blocks are there in your dataset?

In [28]:
total_blocks = data.count()
print("Total Blocks:", total_blocks)

('Total Blocks:', 1003)


###2. What is the largest block height among the blocks in your dataset?

In [94]:
largest_block = data.max(lambda x: x[1])
largest_block_height = largest_block[1]
print("Largest Block Height:", largest_block_height)

('Largest Block Height:', 820527)


###3. What is the date and time for that block?

In [95]:
largest_block_info = data.filter(lambda x: x[1] == largest_block_height).map(lambda x: (x[0], x[3])).collect()
print("Date and Time for the Largest Block Height:", largest_block_info)

('Date and Time for the Largest Block Height:', [(u'2023-12-10', 1702187812)])


###4. What is the highest number of transactions in your blocks?

In [97]:
highest_transactions = data.max(lambda x: x[3])
print("Highest Number of Transactions:", highest_transactions)

('Highest Number of Transactions:', (u'2023-12-10', 820527, u'000000000000000000027333dc0ccb403a30c521052e23b8cc898a64c8632f9a', 1702187812))


In [112]:
sc.stop()

##B. Stock Market Data Analysis

In [113]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row

sc = SparkContext("local", "StockDataAnalysis")
sqlContext = SQLContext(sc)

# Assuming you have an RDD named 'your_rdd' containing the stock data
your_rdd = sc.textFile("/project1/mydata/stock-data-.*")

def parse_line(line):
    parts = line.split(',')
    return Row(Symbol=parts[0], Timestamp=parts[1], 
               Open=float(parts[2]), Close=float(parts[3]),  # Fix the index for 'Close'
               High=float(parts[4]), Low=float(parts[5]), 
               Volume=int(parts[6]))

# Converting RDD to DataFrame
df = sqlContext.createDataFrame(your_rdd.map(parse_line))

# Registering the DataFrame as a temp table
df.registerTempTable("stock_data")

result = sqlContext.sql("SELECT * FROM stock_data")
result.show()


Close  High   Low    Open    Symbol Timestamp            Volume
172.92 172.98 172.81 172.895 AAPL    2023-10-23 19:06:00 10    
330.29 330.29 330.13 330.14  MSFT    2023-10-23 19:06:00 32    
137.24 137.28 137.2  137.2   GOOGL   2023-10-23 19:06:00 155   
213.25 213.35 213.25 213.35  TSLA    2023-10-23 19:06:00 1886  
127.03 127.03 126.96 127.0   AMZN    2023-10-23 19:06:00 198   
172.92 172.98 172.81 172.895 AAPL    2023-10-23 19:06:00 10    
330.29 330.29 330.13 330.14  MSFT    2023-10-23 19:06:00 32    
137.24 137.28 137.2  137.2   GOOGL   2023-10-23 19:06:00 155   
213.25 213.35 213.25 213.35  TSLA    2023-10-23 19:06:00 1886  
127.03 127.03 126.96 127.0   AMZN    2023-10-23 19:06:00 198   
172.92 172.98 172.81 172.895 AAPL    2023-10-23 19:06:00 10    
330.29 330.29 330.13 330.14  MSFT    2023-10-23 19:06:00 32    
137.24 137.28 137.2  137.2   GOOGL   2023-10-23 19:06:00 155   
213.25 213.35 213.25 213.35  TSLA    2023-10-23 19:06:00 1886  
127.03 127.03 126.96 127.0   AMZN    202

###1. How many records are there in the table?

In [134]:
count = sqlContext.sql("SELECT COUNT(*) as record_count FROM stock_data").first().record_count
print("Number of Records: {}".format(count))

Number of Records: 45


### 2. How many different days are there in the table?

In [118]:
distinct_days_count = sqlContext.sql("SELECT COUNT(DISTINCT `Timestamp`) FROM stock_data").collect()[0][0]
print("Total different days:", distinct_days_count)

('Total different days:', 2)


###3. How many records per each day are there in the table?


In [122]:
records_per_day = sqlContext.sql("SELECT `Timestamp`, COUNT(*) as RecordCount FROM stock_data GROUP BY `Timestamp`")
records_per_day.show()

Timestamp            RecordCount
 2023-10-24 19:21:00 25         
 2023-10-23 19:06:00 20         


###4. What are the symbols in the table?

In [123]:
symbols = sqlContext.sql("SELECT DISTINCT `Symbol` FROM stock_data")
symbols.show()

Symbol
AAPL  
GOOGL 
AMZN  
MSFT  
TSLA  


###5. What is the highest price for each symbol?

In [124]:
highest_price_per_symbol = sqlContext.sql("SELECT `Symbol`, MAX(`High`) as HighestPrice FROM stock_data GROUP BY `Symbol`")
highest_price_per_symbol.show()

Symbol HighestPrice
AAPL   173.01      
GOOGL  137.28      
AMZN   127.03      
MSFT   344.37      
TSLA   216.05      


###6. What is the lowest price for each symbol?

In [125]:
lowest_price_per_symbol = sqlContext.sql("SELECT `Symbol`, MIN(`Low`) as LowestPrice FROM stock_data GROUP BY `Symbol`")
lowest_price_per_symbol.show()

Symbol LowestPrice
AAPL   172.81     
GOOGL  129.99     
AMZN   126.36     
MSFT   330.13     
TSLA   213.25     


###7. What is the average price for each symbol?

In [126]:
average_price_per_symbol = sqlContext.sql("SELECT `Symbol`, AVG(`Close`) as AveragePrice FROM stock_data GROUP BY `Symbol`")
average_price_per_symbol.show()

Symbol AveragePrice      
AAPL   172.914           
GOOGL  133.21777777777777
AMZN   126.7188888888889 
MSFT   337.145           
TSLA   214.79444444444445


###8. What is the range of price for each symbol?

In [135]:
price_range_per_symbol = sqlContext.sql("SELECT `Symbol`, MAX(`High`) - MIN(`Low`) as PriceRange FROM stock_data GROUP BY `Symbol`")
price_range_per_symbol.show()

Symbol PriceRange         
AAPL   0.19999999999998863
GOOGL  7.289999999999992  
AMZN   0.6700000000000017 
MSFT   14.240000000000009 
TSLA   2.8000000000000114 


###9. What is the date on which each symbol experienced the highest price?


In [133]:
date_highest_price = sqlContext.sql("SELECT `Symbol`, MAX(`Timestamp`) AS highest_date, MAX(`High`) AS highest_price FROM stock_data GROUP BY`Symbol`")
date_highest_price.show()

Symbol highest_date         highest_price
AAPL    2023-10-24 19:21:00 173.01       
GOOGL   2023-10-24 19:21:00 137.28       
AMZN    2023-10-24 19:21:00 127.03       
MSFT    2023-10-24 19:21:00 344.37       
TSLA    2023-10-24 19:21:00 216.05       


In [153]:
sc.stop()

## C. Blockchain Data Analysis – Part 2

In [154]:
from pyspark.sql import SQLContext

sc = SparkContext("local", "Block_info")
sqlCtx = SQLContext(sc)

In [163]:
blocks = sqlCtx.load(source="jdbc", 
                         url="jdbc:mysql://localhost/project1?user=training&password=training", 
                         dbtable="table_1")

blocks_info = sqlCtx.load(source="jdbc", 
                         url="jdbc:mysql://localhost/project1?user=training&password=training", 
                         dbtable="table_2")

tx_info = sqlCtx.load(source="jdbc", 
                         url="jdbc:mysql://localhost/project1?user=training&password=training", 
                         dbtable="table_3")

In [164]:
blocks.registerTempTable("blocks")
blocks_info.registerTempTable("blocks_info")
tx_info.registerTempTable("tx_info")

###1. How many total blocks are there in your blocks table?

In [165]:
total_blocks = sqlCtx.sql("SELECT COUNT(*) as total_blocks FROM blocks")
total_blocks.show()

total_blocks
921         


###2. What is the largest block height among the blocks in your blocks table?

In [166]:
max_block_height = sqlCtx.sql("SELECT MAX(height) as max_block_height FROM blocks_info")
max_block_height.show()

max_block_height
807290          


###3. What is the date and time for that block?

In [168]:
date_time_for_specific_block = sqlCtx.sql("""
    SELECT time as date_time
    FROM blocks
    WHERE block_index = 807290
""")

date_time_for_specific_block.show()

date_time       
12-09-2023 00:00


###4. What is the largest number of transactions in your blocks?

In [169]:
lrgst_number_transactions = tx_info.groupBy("block_index").count().agg({"count": "max"}).collect()[0][0]
print("Largest number of transactions in a block:", lrgst_number_transactions)

('Largest number of transactions in a block:', 7252)
