In [79]:
from __future__ import absolute_import, division, print_function

import collections
import numpy as np
from six.moves import range
import tensorflow as tf
import tensorflow_federated as tff
import tempfile
import os
import six

import json
import pandas as pd
import _pickle as pickle

tf.enable_resource_variables()

In [21]:
##1. read raw data
bg1_path = 'data/20396154_entries.json'
bg2_path = 'data/99296581_entries.json'
df_bg1 = pd.read_json(bg1_path)
df_bg2 = pd.read_json(bg2_path)

In [115]:
##2. first step cleaning, remove unrelated information
def prefed_process(df_bg, client_id, window):
    df_bg = df_bg[['date', 'sgv']]    
    ret = df_bg.set_index('date').sort_index()
    in_start = 0
    fed = []
    for i in range(len(df_bg)):
        in_end = in_start + window
        if in_end < len(df_bg):
            fed.append((client_id, df_bg['sgv'][in_start:in_end].values.tolist()))
        in_start += 1 
    
    return fed

In [116]:
fed1 = prefed_process(df_bg1,'20396154',24)
fed2 = prefed_process(df_bg2,'99296581',24)

In [117]:
print(len(fed2))

55441


In [118]:
fed = fed1+fed2

In [28]:
def _create_example(features):
  """Convert a tuple of features to a tf.Example."""
  output_features = collections.OrderedDict()
  for i, feature in enumerate(features):
    if isinstance(feature, int):
      output_features[str(i)] = tf.train.Feature(
          int64_list=tf.train.Int64List(value=[feature]))
    elif isinstance(feature, float):
      output_features[str(i)] = tf.train.Feature(
          float_list=tf.train.FloatList(value=[feature]))
    elif isinstance(feature, list):
      output_features[str(i)] = tf.train.Feature(
          float_list=tf.train.FloatList(value=feature))
    else:
      # This is hit if the unittest is updated with unknown types, not an error
      # in the object under test. Extend the unittest capabilities to fix.
      raise NotImplementedError('Cannot handle feature type [%s]' %
                                type(feature))
  return tf.train.Example(features=tf.train.Features(
      feature=output_features)).SerializeToString()

In [70]:
class FakeUserData(object):
  """Container object that creates fake per-user data.
  Using the fake test data, create temporary per-user TFRecord files used for
  the test. Convert each feature-tuple to a `tf.Example` protocol buffer message
  and serialize it to the per-user file.
  """

  def __init__(self, test_data, temp_dir):
    """Construct a FakePerUseData object.
    Args:
      test_data: A list of tuples whose first element is the client ID and all
        subsequent elements are training example features.
      temp_dir: The path to the directory to store temporary per-user files.
    Returns:
      A dict of client IDs to string file paths to TFRecord files.
    """
    writers = {}
    client_file_dict = {}
    for example in test_data:
      client_id, features = example[0], example[1:]
      writer = writers.get(client_id)
      if writer is None:
        fd, path = tempfile.mkstemp(suffix=client_id, dir=temp_dir)
        # close the pre-opened file descriptor immediately to avoid leaking.
        os.close(fd)
        client_file_dict[client_id] = path
        writer = tf.python_io.TFRecordWriter(path=path)
        writers[client_id] = writer
      writer.write(_create_example(features))
    for writer in six.itervalues(writers):
      writer.close()
    self._client_data_file_dict = client_file_dict

  def create_test_dataset_fn(self, client_id):
    client_path = self._client_data_file_dict[client_id]
    features = {
        '0': tf.FixedLenFeature(shape=[], dtype=tf.int64),
        '1': tf.FixedLenFeature(shape=[24], dtype=tf.float32),
    }

    def parse_example(e):
      feature_dict = tf.parse_single_example(serialized=e, features=features)
      return tuple(feature_dict[k] for k in sorted(six.iterkeys(feature_dict)))

    return tf.data.TFRecordDataset(client_path).map(parse_example)

  @property
  def client_ids(self):
    return list(self._client_data_file_dict.keys())


In [74]:
temp_dir = tempfile.mkdtemp()

In [120]:
fake_user_data = FakeUserData(fed, temp_dir)

In [121]:
fake_user_data.client_ids

['20396154', '99296581']

In [11]:
@tff.federated_computation
def hello_world():
  return 'Hello, World!'


hello_world()

'Hello, World!'

In [12]:
federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)

In [14]:
str(federated_float_on_clients.placement)

'CLIENTS'

In [15]:
@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(sensor_readings):
  return tff.federated_mean(sensor_readings)

In [16]:
str(get_average_temperature.type_signature)

'({float32}@CLIENTS -> float32@SERVER)'

In [17]:
get_average_temperature([68.5, 70.3, 69.8])

69.53333

In [18]:
@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(sensor_readings):

  print ('Getting traced, the argument is "{}".'.format(
      type(sensor_readings).__name__))

  return tff.federated_mean(sensor_readings)

Getting traced, the argument is "ValueImpl".
