Skip to content

Commit

Permalink
Merge pull request #95 from paypal/dev
Browse files Browse the repository at this point in the history
Dev to main merge
  • Loading branch information
NeetishPathak committed May 12, 2023
2 parents e770a59 + b3f417f commit 18fcd52
Show file tree
Hide file tree
Showing 22 changed files with 109 additions and 1,270 deletions.
6 changes: 3 additions & 3 deletions client/Java/Juno/CompileLibraryLocally.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ We do however want juno-client-impl to point to the juno-client-api we have comp
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.23</version>
<version>5.3.27</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -285,7 +285,7 @@ We do however want juno-client-impl to point to the juno-client-api we have comp
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.3.23</version>
<version>5.3.27</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
Expand All @@ -305,7 +305,7 @@ We do however want juno-client-impl to point to the juno-client-api we have comp
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.23</version>
<version>5.3.27</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
Expand Down
11 changes: 8 additions & 3 deletions client/Java/Juno/FunctionalTests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ limitations under the License.
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.3.23</version>
<version>5.3.27</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
Expand All @@ -87,7 +87,7 @@ limitations under the License.
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.23</version>
<version>5.3.27</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -152,7 +152,12 @@ limitations under the License.
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.23</version>
<version>5.3.27</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.27</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
6 changes: 3 additions & 3 deletions client/Java/Juno/juno-client-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ limitations under the License.
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.23</version>
<version>5.3.27</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -155,7 +155,7 @@ limitations under the License.
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.3.23</version>
<version>5.3.27</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
Expand All @@ -175,7 +175,7 @@ limitations under the License.
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.23</version>
<version>5.3.27</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ limitations under the License.
<artifactId>spring-boot-starter-web</artifactId>
<version>2.6.7</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
10 changes: 10 additions & 0 deletions client/Java/examples/employee-dashboard-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.27</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>com.paypal.juno</groupId>
<artifactId>juno-client-api</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion cmd/dbscanserv/app/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *Scanner) ScanAndMerge(rangeid int, jm *prime.JoinMap, rs *prime.Result)
return err
}

return err
return nil
}

func (s *Scanner) GetNext(rangeid int, keyList prime.KeyList) *prime.MessageBlock {
Expand Down
26 changes: 13 additions & 13 deletions cmd/etcdsvr/etcdsvr.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ def __init__(self):
def get_members(self):

os.environ["ETCDCTL_API"] = "3"
etcd_cmd = './etcdctl --endpoints="%s"' % (self.cluster_endpoints)
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.cluster_endpoints)
cmd_list = "%s member list" % (etcd_cmd)
members = ""
try:
members = subprocess.check_output(cmd_list, shell=True)
members = subprocess.check_output(cmd_list, shell=False)
except subprocess.CalledProcessError as e:
pass
return members
Expand All @@ -104,23 +104,23 @@ def get_members(self):
def display_status(self):

os.environ["ETCDCTL_API"] = "3"
etcd_cmd = './etcdctl --endpoints="%s"' % (self.cluster_endpoints)
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.cluster_endpoints)
cmd_list = "%s member list 2>&1 | cat" % (etcd_cmd)
cmd_status = "%s endpoint status 2>&1 | cat" % (etcd_cmd)
cmd_health = "%s endpoint health 2>&1 | cat" % (etcd_cmd)

out = etcd_cmd
out += "\n\n===== member list\n" + subprocess.check_output(cmd_list, shell=True)
out += "\n\n===== member list\n" + subprocess.check_output(cmd_list, shell=False)
print(out)
out = "===== endpoint status\n" + subprocess.check_output(cmd_status, shell=True)
out = "===== endpoint status\n" + subprocess.check_output(cmd_status, shell=False)
print(out)
out = "===== endpoint health\n" + subprocess.check_output(cmd_health, shell=True)
out = "===== endpoint health\n" + subprocess.check_output(cmd_health, shell=False)
print(out)

# Join an existing cluster.
def join_cluster(self):

etcd_cmd = './etcdctl --endpoints="%s"' % (self.cluster_endpoints)
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.cluster_endpoints)
cmd_select = "%s member list | grep ', %s, http' | awk -F',' '{print $1}'" % (
etcd_cmd, self.etcd_name
)
Expand All @@ -142,19 +142,19 @@ def join_cluster(self):

# Remove the current entry if any
resp += "\n>> Select:\n%s\n\n" % (cmd_select)
hexid = subprocess.check_output(cmd_select, shell=True)
hexid = subprocess.check_output(cmd_select, shell=False)

if len(hexid) > 0:
cmd_remove = "%s member remove %s" % (etcd_cmd, hexid)
resp += "\n>> Remove:\n%s\n\n" % (cmd_remove)

resp += subprocess.check_output(cmd_remove, stderr=subprocess.STDOUT, shell=True)
resp += subprocess.check_output(cmd_remove, stderr=subprocess.STDOUT, shell=False)
sleep(5)

# Add a new entry
resp += "\n>> Add:\n%s\n\n" % (cmd_add)

resp += subprocess.check_output(cmd_add, stderr=subprocess.STDOUT, shell=True)
resp += subprocess.check_output(cmd_add, stderr=subprocess.STDOUT, shell=False)

resp += "\n>> Members:\n"
resp += self.get_members()
Expand Down Expand Up @@ -338,7 +338,7 @@ def sig_handler(self, sig, frame):

def is_endpoint_healthy(self, wait_time):
os.environ["ETCDCTL_API"] = "3"
etcd_cmd = './etcdctl --endpoints="%s"' % (self.local_endpoint)
etcd_cmd = '%s/etcdctl --endpoints="%s"' % (os.getcwd(), self.local_endpoint)
cmd_health = "%s endpoint health 2>&1 | cat" % (etcd_cmd)
result = ""

Expand All @@ -353,7 +353,7 @@ def is_endpoint_healthy(self, wait_time):
msg = "unhealthy_%s" % (self.etcd_name)
self.logger.error("[MANAGER] %s" % (msg))

result = subprocess.check_output(cmd_health, shell=True)
result = subprocess.check_output(cmd_health, shell=False)
health_check_bytes = str.encode("is healthy")
if health_check_bytes in result:
return True
Expand Down Expand Up @@ -419,7 +419,7 @@ def watch_and_recycle(self, cfg):
self.logger.info("[MANAGER] Started etcd process %d" % (self.pid))

wait_time = 85 + random.randint(0,10)
while not self.is_endpoint_healthy(wait_time):
while False:

if restartCount > 0:
self.shutdown_server()
Expand Down
6 changes: 3 additions & 3 deletions cmd/proxy/proc/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (p *DestroyProcessor) sendInitRequests() {
raw.ShallowCopy(p.requestContext.GetMessage())
proto.SetShardId(raw, p.shardId)
if err := p.request.setOpCode(p.ssRequestOpCode); err != nil {
p.replyStatusToClient(proto.OpStatusBadMsg) // TODO revisit
p.replyStatusToClient(proto.OpStatusBadMsg)
return
}
for i := 0; i < p.ssGroup.numAvailableSSs; i++ {
Expand Down Expand Up @@ -93,12 +93,12 @@ func (p *DestroyProcessor) OnResponseReceived(rc *SSRequestContext) {
}

func (p *DestroyProcessor) OnSSTimeout(rc *SSRequestContext) {
p.onFailure(rc) /// TODO proto.OpStatusNoStorageServer)
p.onFailure(rc)
}

func (p *DestroyProcessor) OnSSIOError(rc *SSRequestContext) {
p.pendingResponses[rc.ssIndex] = nil
p.onFailure(rc) ///TODO proto.OpStatusNoStorageServer)
p.onFailure(rc)
}

func (p *DestroyProcessor) errorResponseOpStatus() (st proto.OpStatus) {
Expand Down
18 changes: 8 additions & 10 deletions cmd/proxy/proc/destroy2.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type TwoPhaseDestroyProcessor struct {
numP1NoKeyResponses int
delRequest RequestAndStats
markDelRequest RequestAndStats
noErrMarkDelResp ResponseWrapper ///TODO refactoring
noErrMarkDelResp ResponseWrapper
}

func newDestroyRequestProcessor() IRequestProcessor {
Expand Down Expand Up @@ -68,7 +68,7 @@ func (p *TwoPhaseDestroyProcessor) sendInitRequests() {
p.state = stTwoPhaseProcPrepare
if p.self.setInitSSRequest() {
if err := p.prepare.setOpCode(p.prepareOpCode); err != nil {
p.replyStatusToClient(proto.OpStatusBadMsg) // TODO revisit
p.replyStatusToClient(proto.OpStatusBadMsg)
return
}
for i := 0; i < p.ssGroup.numAvailableSSs; i++ {
Expand All @@ -82,7 +82,7 @@ func (p *TwoPhaseDestroyProcessor) sendInitRequests() {
}
}
} else {
p.replyStatusToClient(proto.OpStatusBadMsg) // TODO revisit
p.replyStatusToClient(proto.OpStatusBadMsg)
}
}

Expand All @@ -98,7 +98,6 @@ func (p *TwoPhaseDestroyProcessor) actIfDoneWithPrepare() {
p.setAbortMsg()
for i := 0; i < p.prepare.getNumSuccessResponse(); i++ {
ssIndex := p.prepare.successResponses[i].ssRequest.ssIndex
///TODO valid ssIndex
p.sendAbort(ssIndex)
}
p.replyStatusToClient(p.errorPrepareResponseOpStatus())
Expand Down Expand Up @@ -134,7 +133,7 @@ func (p *TwoPhaseDestroyProcessor) markDeleteIfNeeded() {
p.sendAbort(ssIndex)

} else {
p.sendMarkDelete(ssIndex) ///TODO valid ssIndex
p.sendMarkDelete(ssIndex)
}
}
if numSuccess == p.numP1NoKeyResponses {
Expand All @@ -151,7 +150,6 @@ func (p *TwoPhaseDestroyProcessor) actIfDoneWithCommitDeleteRepair() {
if p.commit.hasNoPending() && p.delRequest.hasNoPending() {
if p.commit.getNumSuccessResponse()+int(p.delRequest.numSuccessResponse) >= confNumWrites {
// Reply Ok to client
///TODO to revisit
if p.commit.getNumSuccessResponse() != 0 {
msgToClient := &p.commit.noErrResponse.ssRequest.ssRespOpMsg
if p.prepare.mostUpdatedOkResponse != nil {
Expand All @@ -163,7 +161,7 @@ func (p *TwoPhaseDestroyProcessor) actIfDoneWithCommitDeleteRepair() {
}
p.replyToClient(&p.commit.noErrResponse)
} else {
p.replyStatusToClient(proto.OpStatusInconsistent) ///TODO: temporary
p.replyStatusToClient(proto.OpStatusInconsistent)
}
} else if p.commit.getNumSuccessResponse()+p.delRequest.getNumSuccessResponse() == 0 {
p.replyStatusToClient(proto.OpStatusCommitFailure)
Expand All @@ -176,7 +174,7 @@ func (p *TwoPhaseDestroyProcessor) actIfDoneWithCommitDeleteRepair() {
func (p *TwoPhaseDestroyProcessor) onCommitSuccess(st *SSRequestContext) {
p.TwoPhaseProcessor.onCommitSuccess(st)
if p.numBadRequestID > 0 {
p.replyStatusToClient(proto.OpStatusInconsistent) ///TODO
p.replyStatusToClient(proto.OpStatusInconsistent)
return
}
p.actIfDoneWithCommitDeleteRepair()
Expand All @@ -187,7 +185,7 @@ func (p *TwoPhaseDestroyProcessor) onCommitFailure(st *SSRequestContext) {

if p.delRequest.isSet == false {
var opMsg proto.OperationalMessage
p.setSSOpRequestFromClientRequest(&opMsg, proto.OpCodeDelete, 0, false) ///TODO ### CHECK ### othe meta info...
p.setSSOpRequestFromClientRequest(&opMsg, proto.OpCodeDelete, 0, false)
p.delRequest.setFromOpMsg(&opMsg)
}
p.send(&p.delRequest, st.ssIndex)
Expand Down Expand Up @@ -289,7 +287,7 @@ func (p *TwoPhaseDestroyProcessor) OnSSIOError(rc *SSRequestContext) {
p.pendingResponses[rc.ssIndex] = nil
switch rc.opCode {
case proto.OpCodePrepareDelete:
p.onPrepareFailure(rc) ///TODO to a new OpStatus
p.onPrepareFailure(rc)
case proto.OpCodeCommit:
p.onCommitFailure(rc)
case proto.OpCodeRepair:
Expand Down

0 comments on commit 18fcd52

Please sign in to comment.