In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession


In [2]:
#Truy cập vào master của cụm tại địa chỉ 172.19.0.4:7077 với tên chương trình là Stock_price_analysis
spark = SparkSession.builder.master("local").appName("Gold0").getOrCreate()


In [3]:
csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("./newPrices.csv")

In [4]:
csvFile.printSchema()

root
 |-- Date: string (nullable = true)
 |-- US_dollar: string (nullable = true)
 |-- Euro: string (nullable = true)
 |-- Japanese_yen: string (nullable = true)
 |-- Vietnamese_dong: string (nullable = true)
 |-- Korean_won: string (nullable = true)



In [5]:
csvFile.count()

10941

In [6]:
csvFile.show()

+----------+---------+-----+------------+---------------+----------+
|      Date|US_dollar| Euro|Japanese_yen|Vietnamese_dong|Korean_won|
+----------+---------+-----+------------+---------------+----------+
|12/29/1978|      226|137.1|        #N/A|           #N/A|      #N/A|
|  1/1/1979|      226|137.1|        #N/A|           #N/A|      #N/A|
|  1/2/1979|    226.8|137.3|   43,164.90|           #N/A|107,470.00|
|  1/3/1979|    218.6|  134|   43,717.90|           #N/A|108,027.40|
|  1/4/1979|    223.2|136.8|   43,674.90|           #N/A|108,602.50|
|  1/5/1979|    225.5|138.4|   44,582.50|           #N/A|110,510.40|
|  1/8/1979|    223.1|136.4|   44,436.20|           #N/A|110,356.30|
|  1/9/1979|      224|137.3|   44,045.60|           #N/A|109,248.40|
| 1/10/1979|    220.7|135.5|   43,366.40|           #N/A|108,108.30|
| 1/11/1979|    220.7|135.9|   43,770.60|           #N/A|108,771.70|
| 1/12/1979|    217.6|134.1|   42,837.10|           #N/A|106,856.60|
| 1/15/1979|    216.9|133.8|   42,

In [7]:
 cols=csvFile.columns

In [8]:
for col in cols:
    print(col)

Date
US_dollar
Euro
Japanese_yen
Vietnamese_dong
Korean_won


In [9]:
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType

index =1
while (index<len(cols)):
    csvFile = csvFile.withColumn(cols[index], regexp_replace(cols[index], '#N/A', '0'))
    csvFile = csvFile.withColumn(cols[index], regexp_replace(cols[index], ',', ''))
    csvFile = csvFile.withColumn(cols[index], csvFile[cols[index]].cast('float'))
    index=index+1


In [10]:
csvFile.printSchema()

root
 |-- Date: string (nullable = true)
 |-- US_dollar: float (nullable = true)
 |-- Euro: float (nullable = true)
 |-- Japanese_yen: float (nullable = true)
 |-- Vietnamese_dong: float (nullable = true)
 |-- Korean_won: float (nullable = true)



In [11]:
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType


# Setting an user define function:
# This function converts the string cell into a date:
func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())

csvFile = csvFile.withColumn('Date', func(col('Date')))

csvFile.printSchema()

root
 |-- Date: date (nullable = true)
 |-- US_dollar: float (nullable = true)
 |-- Euro: float (nullable = true)
 |-- Japanese_yen: float (nullable = true)
 |-- Vietnamese_dong: float (nullable = true)
 |-- Korean_won: float (nullable = true)



In [12]:
csvFile.show()


+----------+---------+-----+------------+---------------+----------+
|      Date|US_dollar| Euro|Japanese_yen|Vietnamese_dong|Korean_won|
+----------+---------+-----+------------+---------------+----------+
|1978-12-29|    226.0|137.1|         0.0|            0.0|       0.0|
|1979-01-01|    226.0|137.1|         0.0|            0.0|       0.0|
|1979-01-02|    226.8|137.3|     43164.9|            0.0|  107470.0|
|1979-01-03|    218.6|134.0|     43717.9|            0.0|  108027.4|
|1979-01-04|    223.2|136.8|     43674.9|            0.0|  108602.5|
|1979-01-05|    225.5|138.4|     44582.5|            0.0|  110510.4|
|1979-01-08|    223.1|136.4|     44436.2|            0.0|  110356.3|
|1979-01-09|    224.0|137.3|     44045.6|            0.0|  109248.4|
|1979-01-10|    220.7|135.5|     43366.4|            0.0|  108108.3|
|1979-01-11|    220.7|135.9|     43770.6|            0.0|  108771.7|
|1979-01-12|    217.6|134.1|     42837.1|            0.0|  106856.6|
|1979-01-15|    216.9|133.8|     4

In [13]:
csvFile.write.format("csv").mode("overwrite").option("header", "true")\
.save("hdfs://namenode:9000/gold/gold.csv")