# Projeto

Vamos praticar rapidamente com suas novas habilidades do Spark DataFrame, serão feitas algumas perguntas básicas sobre alguns dados do mercado de ações, neste caso, ações do Walmart dos anos 2012-2017. 


#### Use o arquivo walmart_stock.csv para responder e concluir as tarefas abaixo!

#### Inicie uma sessão simples do Spark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("walmart").getOrCreate()

#### Carregando o arquivo CSV do Walmart Stock e faça o Spark inferir os tipos de dados.

In [2]:
df = spark.read.csv('walmart_stock.csv',header=True,inferSchema=True)

#### nomes das colunas

In [3]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

#### Aparência do esquema

In [4]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



#### 5 primeiras colunas.

In [5]:
# Não precisava estritamente de um loop for, poderia ter apenas então head ()
for row in df.head(5):
    print(row)
    print('\n')

Row(Date=datetime.datetime(2012, 1, 2, 23, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)


Row(Date=datetime.datetime(2012, 1, 3, 23, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)


Row(Date=datetime.datetime(2012, 1, 4, 23, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)


Row(Date=datetime.datetime(2012, 1, 5, 23, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)


Row(Date=datetime.datetime(2012, 1, 8, 23, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)




#### Use describe() to learn about the DataFrame.

In [6]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

(http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast)

In [8]:
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [8]:
from pyspark.sql.functions import format_number

In [None]:
result = df.describe()
result.select(result['summary'],
              format_number(result['Open'].cast('float'),2).alias('Open'),
              format_number(result['High'].cast('float'),2).alias('High'),
              format_number(result['Low'].cast('float'),2).alias('Low'),
              format_number(result['Close'].cast('float'),2).alias('Close'),
              result['Volume'].cast('int').alias('Volume')
             ).show()

#### Criando um novo dataframe com uma coluna chamada HV Ratio que é a relação entre o preço alto e o volume de ações negociadas em um dia.

In [10]:
df2 = df.withColumn("HV Ratio",df["High"]/df["Volume"])#.show()
# df2.show()
df2.select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



#### Em que dia houve a alta do preço?

In [11]:
# Não precisava realmente fazer tanta indexação
# Poderia apenas ter mostrado a linha inteira
df.orderBy(df["High"].desc()).head(1)[0][0]

datetime.datetime(2015, 1, 12, 23, 0)

#### Qual é a média da coluna Fechar?

In [12]:
# Também poderia ter obtido isso em describe ()
from pyspark.sql.functions import mean
df.select(mean("Close")).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



#### Qual é o máximo e mínimo da coluna Volume?

In [13]:
# Também poderia ter usado descrever
from pyspark.sql.functions import max,min

In [14]:
df.select(max("Volume"),min("Volume")).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



#### Por quantos dias o fechamento foi inferior a 60 dólares?

In [15]:
df.filter("Close < 60").count()

81

In [16]:
df.filter(df['Close'] < 60).count()

81

In [17]:
from pyspark.sql.functions import count
result = df.filter(df['Close'] < 60)
result.select(count('Close')).show()

+------------+
|count(Close)|
+------------+
|          81|
+------------+



#### Qual a porcentagem do tempo em que a Alta foi superior a 80 dólares?
#### Em outras palavras, (Número de dias altos> 80) / (Total de dias no conjunto de dados)

In [18]:
# 9,14 por cento das vezes era mais de 80
# Muitas maneiras de fazer isso
(df.filter(df["High"]>80).count()*1.0/df.count())*100

9.141494435612083

#### Qual é a correlação de Pearson entre alto e volume?
#### [Hint](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameStatFunctions.corr)

In [19]:
from pyspark.sql.functions import corr
df.select(corr("High","Volume")).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



#### Qual é o máximo máximo por ano?

In [20]:
from pyspark.sql.functions import year
yeardf = df.withColumn("Year",year(df["Date"]))

In [21]:
max_df = yeardf.groupBy('Year').max()

In [22]:
# 2015
max_df.select('Year','max(High)').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



#### Qual é o fechamento médio de cada mês do calendário?
#### Ou seja, ao longo de todos os anos, qual é o preço médio de fechamento de janeiro, fevereiro, março, etc ... Seu resultado terá um valor para cada um desses meses.

In [23]:
from pyspark.sql.functions import month
monthdf = df.withColumn("Month",month("Date"))
monthavgs = monthdf.select("Month","Close").groupBy("Month").mean()
monthavgs.select("Month","avg(Close)").orderBy('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

