Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
fazlan-nazeem committed Jul 2, 2018
1 parent 0914451 commit d7c929f
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 78 deletions.
Expand Up @@ -26,7 +26,7 @@
import org.wso2.analytics.apim.file.adapter.exception.FileBasedAnalyticsException;
import org.wso2.analytics.apim.file.adapter.internal.ds.FileEventAdapterServiceValueHolder;
import org.wso2.analytics.apim.file.adapter.util.FileEventAdapterConstants;
import org.wso2.analytics.apim.file.adapter.util.UsagePublisherUtils;
import org.wso2.analytics.apim.file.adapter.util.FileDataRetrieverUtil;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Event;

Expand Down Expand Up @@ -111,9 +111,9 @@ private void publishEvents() {
try {
adapter.getInputEventAdapterListener().onEvent(
new Event(streamId, Long.parseLong(timeStamp),
(Object[]) UsagePublisherUtils.createMetaData(metaData),
(Object[]) UsagePublisherUtils.createMetaData(correlationData),
UsagePublisherUtils.createPayload(streamId, payloadData)));
(Object[]) FileDataRetrieverUtil.createMetaData(metaData),
(Object[]) FileDataRetrieverUtil.createMetaData(correlationData),
FileDataRetrieverUtil.createPayload(streamId, payloadData)));
} catch (Exception e) {
log.warn("Error occurred while publishing event : " + Arrays.toString(elements), e);
}
Expand Down
Expand Up @@ -37,7 +37,6 @@ public FileDataRetrieverThreadFactory() {
namePrefix = "UsagePublisher-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
}


@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
Expand Down
Expand Up @@ -74,6 +74,7 @@ private void readFileFromDatabase() {
String usagePublishFrequency = System
.getProperty(FileEventAdapterConstants.UPLOADED_USAGE_PUBLISH_FREQUENCY_PROPERTY);
if(StringUtils.isEmpty(usagePublishFrequency)) {
log.debug("Default usage publishing frequency will be used");
usagePublishFrequency = FileEventAdapterConstants.DEFAULT_UPLOADED_USAGE_PUBLISH_FREQUENCY;
}
timer.schedule(usagePublisherTask, 1000, Long.parseLong(usagePublishFrequency));
Expand Down
Expand Up @@ -37,7 +37,7 @@
import java.util.List;

/**
* This class contains methods for persisting File Upload information
* This class contains methods DB access for FileEventAdapter
*/
public class FIleEventAdapterDAO {

Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.wso2.analytics.apim.file.adapter.dto;

/**
* This class represents a Uploaded File
* This class represents an uploaded file
*/
public class UploadedFileInfoDTO {
private String tenantDomain;
Expand Down
Expand Up @@ -18,87 +18,33 @@

package org.wso2.analytics.apim.file.adapter.util;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.wso2.analytics.apim.file.adapter.FileDataRetriever;
import org.wso2.analytics.apim.file.adapter.exception.FileBasedAnalyticsException;
import org.wso2.analytics.apim.file.adapter.internal.ds.FileEventAdapterServiceValueHolder;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.event.stream.core.EventStreamService;
import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationException;
import org.wso2.carbon.utils.CarbonUtils;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* Util Class for Usage Publishing module
* Util Class for FileDataRetriever
*/
public class UsagePublisherUtils {
public class FileDataRetrieverUtil {

private static volatile Map<String, JSONArray> streamDefinitions = null;

// public static Map<String, JSONArray> getStreamDefinitions() throws FileBasedAnalyticsException {
// if (streamDefinitions == null || streamDefinitions.size() < 6) {
// synchronized (FileDataRetriever.class) {
// if (streamDefinitions == null || streamDefinitions.size() < 6) {
// String streamDirectoryPath = CarbonUtils.getCarbonConfigDirPath() + File.separator
// + FileEventAdapterConstants.STREAM_DEFINITIONS_DIRECTORY;
// Collection<File> files = FileUtils.listFiles(new File(streamDirectoryPath), null, false);
// if (!(files.size() < 6)) {
// streamDefinitions = new HashMap<>();
// for (File file : files) {
// FileInputStream fileInputStream = null;
// InputStreamReader inputStreamReader = null;
// BufferedReader bufferedReader = null;
// try {
// fileInputStream = new FileInputStream(file);
// inputStreamReader = new InputStreamReader(fileInputStream, StandardCharsets.UTF_8);
// bufferedReader = new BufferedReader(inputStreamReader);
// String readLine;
// StringBuilder jsonStr = new StringBuilder();
// while ((readLine = bufferedReader.readLine()) != null) {
// jsonStr.append(readLine);
// }
// JSONParser jsonParser = new JSONParser();
// JSONObject jsonObject = (JSONObject) jsonParser.parse(jsonStr.toString());
// String key = jsonObject.get("name") + ":"
// + jsonObject.get("version");
// streamDefinitions.put(key, (JSONArray) jsonObject.get("payloadData"));
// } catch (IOException | ParseException e) {
// throw new FileBasedAnalyticsException("Error occurred while reading " + file.getName(), e);
// } finally {
// IOUtils.closeQuietly(fileInputStream);
// IOUtils.closeQuietly(inputStreamReader);
// IOUtils.closeQuietly(bufferedReader);
// }
// }
// } else {
// throw new FileBasedAnalyticsException("Steam Definitions not found.");
// }
// }
// }
// }
// return streamDefinitions;
//
// }

private static String[] getStreamIdList() {
String[] streamIdList = new String[3];
streamIdList[0] = "org.wso2.apimgt.statistics.request:1.1.0";
streamIdList[1] = "org.wso2.apimgt.statistics.response:1.1.0";
streamIdList[2] = "org.wso2.apimgt.statistics.execution.time:1.0.0";
streamIdList[0] = FileEventAdapterConstants.REQUEST_STREAM_ID;
streamIdList[1] = FileEventAdapterConstants.RESPONSE_STREAM_ID;
streamIdList[2] = FileEventAdapterConstants.EXECUTION_STREAM_ID;
streamIdList[3] = FileEventAdapterConstants.FAULT_STREAM_ID;
streamIdList[4] = FileEventAdapterConstants.THROTTLE_STREAM_ID;
return streamIdList;
}

Expand Down Expand Up @@ -170,13 +116,4 @@ public static Object getPayloadObject(String type, String string) throws Excepti
}
}

public static String getUploadedFileDirPath(String tenantDomain, String tempDirName) {
//Temporary directory is used for keeping the uploaded files
// i.e [APIUsageFileLocation]/api-usage-data/tenantDomain/tvtzC
String storageLocation = System.getProperty("APIUsageFileLocation");
return ((storageLocation != null && !storageLocation.isEmpty())
? storageLocation : CarbonUtils.getCarbonHome())
+ File.separator + FileEventAdapterConstants.API_USAGE_OUTPUT_DIRECTORY + File.separator
+ tenantDomain + File.separator + tempDirName;
}
}
Expand Up @@ -88,7 +88,7 @@ public final class FileEventAdapterConstants {

public static final String FAULT_STREAM_ID = "org.wso2.apimgt.statistics.fault:1.0.0";

public static final String THROTTLE_STREAM_ID = "org.wso2.apimgt.statistics.throttle:1.1.0";
public static final String THROTTLE_STREAM_ID = "org.wso2.apimgt.statistics.throttle:1.0.0";

}

0 comments on commit d7c929f

Please sign in to comment.